rustack_logs_http/
service.rs1use std::{convert::Infallible, future::Future, pin::Pin, sync::Arc};
4
5use bytes::Bytes;
6use http_body_util::BodyExt;
7use hyper::body::Incoming;
8use rustack_logs_model::error::LogsError;
9
10use crate::{
11 body::LogsResponseBody,
12 dispatch::{LogsHandler, dispatch_operation},
13 response::{CONTENT_TYPE, error_to_response},
14 router::resolve_operation,
15};
16
17#[derive(Clone)]
19pub struct LogsHttpConfig {
20 pub skip_signature_validation: bool,
22 pub region: String,
24 pub credential_provider: Option<Arc<dyn rustack_auth::CredentialProvider>>,
26}
27
28impl std::fmt::Debug for LogsHttpConfig {
29 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30 f.debug_struct("LogsHttpConfig")
31 .field("skip_signature_validation", &self.skip_signature_validation)
32 .field("region", &self.region)
33 .field(
34 "credential_provider",
35 &self.credential_provider.as_ref().map(|_| "..."),
36 )
37 .finish()
38 }
39}
40
41impl Default for LogsHttpConfig {
42 fn default() -> Self {
43 Self {
44 skip_signature_validation: true,
45 region: "us-east-1".to_owned(),
46 credential_provider: None,
47 }
48 }
49}
50
51#[derive(Debug)]
56pub struct LogsHttpService<H: LogsHandler> {
57 handler: Arc<H>,
58 config: Arc<LogsHttpConfig>,
59}
60
61impl<H: LogsHandler> LogsHttpService<H> {
62 pub fn new(handler: Arc<H>, config: LogsHttpConfig) -> Self {
64 Self {
65 handler,
66 config: Arc::new(config),
67 }
68 }
69}
70
71impl<H: LogsHandler> Clone for LogsHttpService<H> {
72 fn clone(&self) -> Self {
73 Self {
74 handler: Arc::clone(&self.handler),
75 config: Arc::clone(&self.config),
76 }
77 }
78}
79
80impl<H: LogsHandler> hyper::service::Service<http::Request<Incoming>> for LogsHttpService<H> {
81 type Response = http::Response<LogsResponseBody>;
82 type Error = Infallible;
83 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
84
85 fn call(&self, req: http::Request<Incoming>) -> Self::Future {
86 let handler = Arc::clone(&self.handler);
87 let config = Arc::clone(&self.config);
88 let request_id = uuid::Uuid::new_v4().to_string();
89
90 Box::pin(async move {
91 let response = process_request(req, handler.as_ref(), &config, &request_id).await;
92 let response = add_common_headers(response, &request_id);
93 Ok(response)
94 })
95 }
96}
97
98async fn process_request<H: LogsHandler>(
100 req: http::Request<Incoming>,
101 handler: &H,
102 config: &LogsHttpConfig,
103 request_id: &str,
104) -> http::Response<LogsResponseBody> {
105 let (parts, incoming) = req.into_parts();
106
107 if parts.method != http::Method::POST {
109 let err = LogsError::validation(format!(
110 "CloudWatch Logs requires POST method, got {}",
111 parts.method,
112 ));
113 return error_to_response(&err, request_id);
114 }
115
116 let op = match resolve_operation(&parts.headers) {
118 Ok(op) => op,
119 Err(err) => return error_to_response(&err, request_id),
120 };
121
122 let body = match collect_body(incoming).await {
124 Ok(body) => body,
125 Err(err) => return error_to_response(&err, request_id),
126 };
127
128 if !config.skip_signature_validation {
130 if let Some(ref cred_provider) = config.credential_provider {
131 let body_hash = rustack_auth::hash_payload(&body);
132 if let Err(auth_err) =
133 rustack_auth::verify_sigv4(&parts, &body_hash, cred_provider.as_ref())
134 {
135 let err = LogsError::with_message(
136 rustack_logs_model::error::LogsErrorCode::ValidationException,
137 auth_err.to_string(),
138 );
139 return error_to_response(&err, request_id);
140 }
141 }
142 }
143
144 match dispatch_operation(handler, op, body).await {
146 Ok(response) => response,
147 Err(err) => error_to_response(&err, request_id),
148 }
149}
150
151async fn collect_body(incoming: Incoming) -> Result<Bytes, LogsError> {
153 incoming
154 .collect()
155 .await
156 .map(http_body_util::Collected::to_bytes)
157 .map_err(|e| LogsError::internal_error(format!("Failed to read request body: {e}")))
158}
159
160fn add_common_headers(
162 mut response: http::Response<LogsResponseBody>,
163 request_id: &str,
164) -> http::Response<LogsResponseBody> {
165 let headers = response.headers_mut();
166
167 if let Ok(hv) = http::HeaderValue::from_str(request_id) {
168 headers.entry("x-amzn-requestid").or_insert(hv);
169 }
170
171 headers
172 .entry("content-type")
173 .or_insert(http::HeaderValue::from_static(CONTENT_TYPE));
174
175 headers.insert("server", http::HeaderValue::from_static("Rustack"));
176
177 headers.insert(
179 "access-control-allow-origin",
180 http::HeaderValue::from_static("*"),
181 );
182
183 response
184}