use std::borrow::Cow;
use std::sync::Arc;
use std::time::Duration;
use rand::{RngExt, rngs::StdRng};
use reqwest::{
RequestBuilder, Response, StatusCode,
header::{self, AUTHORIZATION, HeaderMap, HeaderValue},
};
use serde::{Deserialize, Serialize, de::DeserializeOwned};
use crate::error::{Error, ErrorResponse};
pub mod error;
pub mod line_login;
pub mod messaging_api;
#[cfg(feature = "mock")]
pub mod mock;
const PREFIX_URL: &str = "https://api.line.me";
const ENV_KEY: &str = "LINE_API_PREFIX_URL";
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct LineResponseHeader {
pub request_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub accepted_request_id: Option<String>,
}
const REDACTED: &str = "***";
#[derive(Clone)]
pub struct LineRequestLog<'a> {
headers: Option<&'a HeaderMap>,
body: &'a serde_json::Value,
}
impl std::fmt::Debug for LineRequestLog<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LineRequestLog")
.field("headers", &self.headers_redacted())
.field("body", &self.body_redacted())
.finish()
}
}
impl<'a> LineRequestLog<'a> {
pub fn headers(&self) -> Option<&'a HeaderMap> {
self.headers
}
pub fn body(&self) -> &'a serde_json::Value {
self.body
}
pub fn headers_redacted(&self) -> Option<HeaderMap> {
self.headers.map(redact_headers)
}
pub fn body_redacted(&self) -> serde_json::Value {
redact_body(self.body)
}
}
#[derive(Debug, Clone)]
pub enum ResponseBody {
Json(serde_json::Value),
Raw(String),
}
#[derive(Clone)]
pub struct LineResponseLog<'a> {
headers: &'a HeaderMap,
body: ResponseBody,
status_code: StatusCode,
}
impl std::fmt::Debug for LineResponseLog<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LineResponseLog")
.field("headers", &self.headers)
.field("status_code", &self.status_code)
.field("body", &self.body_redacted())
.finish()
}
}
impl<'a> LineResponseLog<'a> {
pub fn headers(&self) -> &'a HeaderMap {
self.headers
}
pub fn as_value(&self) -> Cow<'_, serde_json::Value> {
match &self.body {
ResponseBody::Json(value) => Cow::Borrowed(value),
ResponseBody::Raw(text) => Cow::Owned(serde_json::Value::String(text.clone())),
}
}
pub fn status_code(&self) -> StatusCode {
self.status_code
}
pub fn body_was_json(&self) -> bool {
matches!(self.body, ResponseBody::Json(_))
}
pub fn body_redacted(&self) -> serde_json::Value {
redact_body(&self.as_value())
}
}
pub const REDACTED_BODY_KEYS: &[&str] = &[
"access_token",
"refresh_token",
"client_secret",
"code",
"code_verifier",
"id_token",
"useraccesstoken",
];
fn redact_headers(headers: &HeaderMap) -> HeaderMap {
let mut redacted = headers.clone();
if redacted.contains_key(AUTHORIZATION) {
redacted.insert(AUTHORIZATION, HeaderValue::from_static(REDACTED));
}
redacted
}
fn redact_body(value: &serde_json::Value) -> serde_json::Value {
match value {
serde_json::Value::Object(map) => serde_json::Value::Object(
map.iter()
.map(|(key, val)| {
if REDACTED_BODY_KEYS.contains(&key.to_ascii_lowercase().as_str()) {
(key.clone(), serde_json::Value::String(REDACTED.to_string()))
} else {
(key.clone(), redact_body(val))
}
})
.collect(),
),
serde_json::Value::Array(items) => {
serde_json::Value::Array(items.iter().map(redact_body).collect())
}
other => other.clone(),
}
}
pub(crate) fn serialize_log_body<T: Serialize>(value: &T) -> serde_json::Value {
serde_json::to_value(value)
.unwrap_or_else(|err| serde_json::json!({ "_serialize_error": err.to_string() }))
}
fn run_log_callback(label: &str, f: impl FnOnce()) {
if let Err(payload) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
let msg = payload
.downcast_ref::<&str>()
.map(|s| s.to_string())
.or_else(|| payload.downcast_ref::<String>().cloned())
.unwrap_or_else(|| "<non-string panic payload>".to_string());
tracing::error!(
callback = label,
panic = %msg,
"LineOptions callback panicked; ignored to keep the API call alive"
);
}
}
pub type OnRequest = Arc<dyn Fn(&LineRequestLog) + Send + Sync>;
pub type OnResponse = Arc<dyn Fn(&LineRequestLog, &LineResponseLog) + Send + Sync>;
#[derive(Default, Clone, Serialize, Deserialize)]
#[non_exhaustive]
pub struct LineOptions {
pub(crate) prefix_url: Option<String>,
pub(crate) timeout_duration: Option<Duration>,
pub(crate) try_count: Option<u8>,
pub(crate) retry_duration: Option<Duration>,
#[serde(skip)]
pub(crate) on_request: Option<OnRequest>,
#[serde(skip)]
pub(crate) on_response: Option<OnResponse>,
}
impl std::fmt::Debug for LineOptions {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LineOptions")
.field("prefix_url", &self.prefix_url)
.field("timeout_duration", &self.timeout_duration)
.field("try_count", &self.try_count)
.field("retry_duration", &self.retry_duration)
.field("on_request", &self.on_request.as_ref().map(|_| "Fn"))
.field("on_response", &self.on_response.as_ref().map(|_| "Fn"))
.finish()
}
}
impl LineOptions {
pub fn get_try_count(&self) -> u8 {
self.try_count.unwrap_or(1).max(1)
}
pub fn get_retry_duration(&self) -> Duration {
self.retry_duration.unwrap_or(Duration::from_secs(0))
}
pub fn get_timeout_duration(&self) -> Duration {
self.timeout_duration.unwrap_or(Duration::from_secs(0))
}
pub fn get_prefix_url(&self) -> String {
self.resolve_prefix_url()
}
pub(crate) fn resolve_prefix_url(&self) -> String {
self.prefix_url
.clone()
.unwrap_or_else(|| std::env::var(ENV_KEY).unwrap_or_else(|_| PREFIX_URL.to_string()))
}
pub fn with_prefix_url(mut self, prefix_url: impl Into<String>) -> Self {
self.prefix_url = Some(prefix_url.into());
self
}
pub fn with_timeout_duration(mut self, timeout_duration: Duration) -> Self {
self.timeout_duration = Some(timeout_duration);
self
}
pub fn with_try_count(mut self, try_count: u8) -> Self {
self.try_count = Some(try_count);
self
}
pub fn with_retry_duration(mut self, retry_duration: Duration) -> Self {
self.retry_duration = Some(retry_duration);
self
}
pub fn with_on_request(mut self, f: impl Fn(&LineRequestLog) + Send + Sync + 'static) -> Self {
self.on_request = Some(Arc::new(f));
self
}
pub fn with_on_response(
mut self,
f: impl Fn(&LineRequestLog, &LineResponseLog) + Send + Sync + 'static,
) -> Self {
self.on_response = Some(Arc::new(f));
self
}
}
pub(crate) fn make_url(postfix_url: &str, options: &LineOptions) -> String {
format!("{}{postfix_url}", options.resolve_prefix_url())
}
pub(crate) fn apply_auth(builder: RequestBuilder, channel_access_token: &str) -> RequestBuilder {
builder.header(AUTHORIZATION, format!("Bearer {channel_access_token}"))
}
pub(crate) fn apply_timeout(builder: RequestBuilder, options: &LineOptions) -> RequestBuilder {
let timeout_duration = options.get_timeout_duration();
if timeout_duration.is_zero() {
builder
} else {
builder.timeout(timeout_duration)
}
}
pub(crate) fn is_standard_retry(status_code: StatusCode) -> bool {
status_code.is_server_error() || status_code == StatusCode::TOO_MANY_REQUESTS
}
pub(crate) fn make_line_header(response: &Response) -> LineResponseHeader {
let headers: &header::HeaderMap = response.headers();
let request_id = headers
.get("X-Line-Request-Id")
.map(|it| {
it.to_str().unwrap_or_else(|_| {
tracing::warn!("X-Line-Request-Id present but not valid ASCII; recording empty");
""
})
})
.unwrap_or("");
let accepted_request_id = headers.get("X-Line-Accepted-Request-Id").map(|it| {
it.to_str()
.unwrap_or_else(|_| {
tracing::warn!(
"X-Line-Accepted-Request-Id present but not valid ASCII; recording empty"
);
""
})
.to_string()
});
LineResponseHeader {
request_id: request_id.to_owned(),
accepted_request_id,
}
}
pub(crate) fn calc_retry_duration(
retry_duration: Duration,
try_count: u32,
rng: &mut StdRng,
) -> Duration {
let jitter = Duration::from_millis(rng.random_range(0..100));
let retry_count = 2u64.pow(try_count) as u32;
retry_duration * retry_count + jitter
}
pub(crate) async fn execute_api_raw(
builder: RequestBuilder,
allow_conflict: bool,
options: &LineOptions,
request_value: &serde_json::Value,
) -> Result<(serde_json::Value, LineResponseHeader, StatusCode), Box<Error>> {
let need_log = options.on_request.is_some() || options.on_response.is_some();
let request_headers: Option<HeaderMap> = if need_log {
builder
.try_clone()
.and_then(|b| b.build().ok())
.map(|req| req.headers().clone())
} else {
None
};
if let Some(cb) = &options.on_request {
run_log_callback("on_request", || {
cb(&LineRequestLog {
headers: request_headers.as_ref(),
body: request_value,
});
});
}
let response = builder
.send()
.await
.map_err(|err| Box::new(Error::Reqwest(err)))?;
let status_code = response.status();
let line_header = make_line_header(&response);
let response_headers = if options.on_response.is_some() {
response.headers().clone()
} else {
HeaderMap::new()
};
let text = response
.text()
.await
.map_err(|err| Box::new(Error::Reqwest(err)))?;
let json_result = serde_json::from_str::<serde_json::Value>(&text);
if let Some(cb) = &options.on_response {
let response_body = match json_result.as_ref() {
Ok(value) => ResponseBody::Json(value.clone()),
Err(_) => ResponseBody::Raw(text.clone()),
};
run_log_callback("on_response", || {
cb(
&LineRequestLog {
headers: request_headers.as_ref(),
body: request_value,
},
&LineResponseLog {
headers: &response_headers,
body: response_body,
status_code,
},
);
});
}
let Ok(json) = json_result else {
return Err(Box::new(Error::OtherText(text, status_code, line_header)));
};
if status_code.is_success() || (allow_conflict && status_code == StatusCode::CONFLICT) {
Ok((json, line_header, status_code))
} else {
match serde_json::from_value::<ErrorResponse>(json.clone()) {
Ok(error_response) => Err(Box::new(Error::Line(
error_response,
status_code,
line_header,
))),
Err(_) => Err(Box::new(Error::OtherJson(json, status_code, line_header))),
}
}
}
const HEADER_RETRY_KEY: &str = "X-Line-Retry-Key";
pub(crate) async fn execute_api<T, F>(
f: impl Fn() -> RequestBuilder,
options: &LineOptions,
is_retry: F,
retry_key: Option<String>,
request_value_fn: impl FnOnce() -> serde_json::Value,
) -> Result<(T, LineResponseHeader), Box<Error>>
where
T: DeserializeOwned,
F: Fn(StatusCode) -> bool,
{
let mut res = Err(Error::Invalid("fail loop".to_string()));
let try_count = options.get_try_count();
let retry_duration: Duration = options.get_retry_duration();
let request_value = if options.on_request.is_some() || options.on_response.is_some() {
request_value_fn()
} else {
serde_json::Value::Null
};
let mut rng: StdRng = rand::make_rng();
for i in 0..try_count {
let mut builder = f();
if let Some(retry_key) = &retry_key
&& try_count > 1
{
builder = builder.header(HEADER_RETRY_KEY, retry_key);
}
match execute_api_raw(builder, retry_key.is_some(), options, &request_value).await {
Ok((json, line_header, status_code)) => {
res = match serde_json::from_value(json.clone()) {
Ok(data) => Ok((data, line_header)),
Err(_err) => match serde_json::from_value::<ErrorResponse>(json.clone()) {
Ok(error_response) => {
Err(Error::Line(error_response, status_code, line_header))
}
Err(_) => Err(Error::OtherJson(json, status_code, line_header)),
},
};
break;
}
Err(err) => {
tracing::debug!("error: {:?}", err);
if !is_retry(
err.status_code()
.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
) {
res = Err(*err);
break;
}
if i + 1 >= try_count {
res = Err(*err);
} else if !retry_duration.is_zero() {
tokio::time::sleep(calc_retry_duration(retry_duration, i as u32, &mut rng))
.await;
}
}
}
}
res.map_err(Box::new)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_serialize_log_body_success() {
let value = serialize_log_body(&serde_json::json!({"a": 1}));
assert_eq!(value, serde_json::json!({"a": 1}));
}
#[test]
fn test_serialize_log_body_failure_sentinel() {
use std::collections::HashMap;
let mut map: HashMap<Vec<i32>, i32> = HashMap::new();
map.insert(vec![1, 2], 3);
let value = serialize_log_body(&map);
assert!(
value.get("_serialize_error").is_some(),
"expected serialize error sentinel, got: {value}"
);
assert_ne!(value, serde_json::Value::Null);
}
#[test]
fn test_redact_body_masks_known_keys_recursively() {
let input = serde_json::json!({
"client_secret": "secret",
"grant_type": "authorization_code",
"nested": { "refresh_token": "rt", "keep": "v" },
"list": [ { "access_token": "at" } ],
});
let out = redact_body(&input);
assert_eq!(out["client_secret"], "***");
assert_eq!(out["grant_type"], "authorization_code");
assert_eq!(out["nested"]["refresh_token"], "***");
assert_eq!(out["nested"]["keep"], "v");
assert_eq!(out["list"][0]["access_token"], "***");
}
#[test]
fn test_redact_body_case_insensitive() {
let input = serde_json::json!({ "userAccessToken": "x", "ID_TOKEN": "y" });
let out = redact_body(&input);
assert_eq!(out["userAccessToken"], "***");
assert_eq!(out["ID_TOKEN"], "***");
}
#[test]
fn test_redact_deauthorize_request_body() {
use crate::line_login::post_user_v1_deauthorize::RequestBody;
let body = RequestBody {
user_access_token: "super-secret-token".to_string(),
};
let value = serialize_log_body(&body);
assert_eq!(value["userAccessToken"], "super-secret-token");
let redacted = redact_body(&value);
assert_eq!(redacted["userAccessToken"], "***");
}
#[cfg(feature = "mock")]
#[tokio::test]
async fn test_no_callback_skips_request_value_fn() {
let mut server = mockito::Server::new_async().await;
let mock = server
.mock("GET", "/test")
.with_status(200)
.with_body("{}")
.create_async()
.await;
let url = format!("{}/test", server.url());
let options = LineOptions::default();
let result: Result<(serde_json::Value, LineResponseHeader), _> = execute_api(
|| reqwest::Client::new().get(&url),
&options,
is_standard_retry,
None,
|| panic!("request_value_fn must not be called when no callback is set"),
)
.await;
assert!(result.is_ok());
mock.assert_async().await;
}
#[cfg(feature = "mock")]
#[tokio::test]
async fn test_callback_panic_does_not_fail_api() {
let mut server = mockito::Server::new_async().await;
let mock = server
.mock("GET", "/test")
.with_status(200)
.with_body("{}")
.create_async()
.await;
let url = format!("{}/test", server.url());
let options = LineOptions::default()
.with_on_request(|_log| panic!("on_request callback panics"))
.with_on_response(|_req, _res| panic!("on_response callback panics"));
let result: Result<(serde_json::Value, LineResponseHeader), _> = execute_api(
|| reqwest::Client::new().get(&url),
&options,
is_standard_retry,
None,
|| serde_json::Value::Null,
)
.await;
assert!(result.is_ok(), "callback panic must not fail the API call");
mock.assert_async().await;
}
#[test]
fn test_get_try_count_normalizes_zero() {
assert_eq!(LineOptions::default().get_try_count(), 1, "None は 1");
assert_eq!(
LineOptions::default().with_try_count(0).get_try_count(),
1,
"0 は 1 に正規化"
);
assert_eq!(
LineOptions::default().with_try_count(3).get_try_count(),
3,
"それ以外はそのまま"
);
assert_eq!(LineOptions::default().with_try_count(0).try_count, Some(0));
}
#[test]
fn test_line_options_serde_round_trip_drops_callbacks() {
let options = LineOptions::default()
.with_prefix_url("https://example.com")
.with_try_count(3)
.with_on_request(|_log| {})
.with_on_response(|_req, _res| {});
assert!(options.on_request.is_some());
let json = serde_json::to_string(&options).unwrap();
let restored: LineOptions = serde_json::from_str(&json).unwrap();
assert_eq!(restored.prefix_url.as_deref(), Some("https://example.com"));
assert_eq!(restored.try_count, Some(3));
assert!(restored.on_request.is_none());
assert!(restored.on_response.is_none());
}
#[test]
fn test_request_log_headers_none_contract() {
let body = serde_json::Value::Null;
let log = LineRequestLog {
headers: None,
body: &body,
};
assert!(log.headers().is_none());
assert!(log.headers_redacted().is_none());
}
#[test]
fn test_redact_body_masks_generic_code_in_response() {
let response = serde_json::json!({
"message": "invalid request",
"code": "40000",
});
let redacted = redact_body(&response);
assert_eq!(redacted["code"], "***", "汎用 code も意図的にマスクされる");
assert_eq!(redacted["message"], "invalid request");
}
}