Skip to main content

rustack_logs_http/
service.rs

1//! CloudWatch Logs HTTP service implementing the hyper `Service` trait.
2
3use std::{convert::Infallible, future::Future, pin::Pin, sync::Arc};
4
5use bytes::Bytes;
6use http_body_util::BodyExt;
7use hyper::body::Incoming;
8use rustack_logs_model::error::LogsError;
9
10use crate::{
11    body::LogsResponseBody,
12    dispatch::{LogsHandler, dispatch_operation},
13    response::{CONTENT_TYPE, error_to_response},
14    router::resolve_operation,
15};
16
17/// Configuration for the CloudWatch Logs HTTP service.
18#[derive(Clone)]
19pub struct LogsHttpConfig {
20    /// Whether to skip AWS signature validation.
21    pub skip_signature_validation: bool,
22    /// The AWS region this service is running in.
23    pub region: String,
24    /// Credential provider for signature validation.
25    pub credential_provider: Option<Arc<dyn rustack_auth::CredentialProvider>>,
26}
27
28impl std::fmt::Debug for LogsHttpConfig {
29    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30        f.debug_struct("LogsHttpConfig")
31            .field("skip_signature_validation", &self.skip_signature_validation)
32            .field("region", &self.region)
33            .field(
34                "credential_provider",
35                &self.credential_provider.as_ref().map(|_| "..."),
36            )
37            .finish()
38    }
39}
40
41impl Default for LogsHttpConfig {
42    fn default() -> Self {
43        Self {
44            skip_signature_validation: true,
45            region: "us-east-1".to_owned(),
46            credential_provider: None,
47        }
48    }
49}
50
51/// Hyper `Service` implementation for CloudWatch Logs.
52///
53/// Wraps a [`LogsHandler`] implementation and routes incoming HTTP
54/// requests to the appropriate CloudWatch Logs operation handler.
55#[derive(Debug)]
56pub struct LogsHttpService<H: LogsHandler> {
57    handler: Arc<H>,
58    config: Arc<LogsHttpConfig>,
59}
60
61impl<H: LogsHandler> LogsHttpService<H> {
62    /// Create a new `LogsHttpService`.
63    pub fn new(handler: Arc<H>, config: LogsHttpConfig) -> Self {
64        Self {
65            handler,
66            config: Arc::new(config),
67        }
68    }
69}
70
71impl<H: LogsHandler> Clone for LogsHttpService<H> {
72    fn clone(&self) -> Self {
73        Self {
74            handler: Arc::clone(&self.handler),
75            config: Arc::clone(&self.config),
76        }
77    }
78}
79
80impl<H: LogsHandler> hyper::service::Service<http::Request<Incoming>> for LogsHttpService<H> {
81    type Response = http::Response<LogsResponseBody>;
82    type Error = Infallible;
83    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
84
85    fn call(&self, req: http::Request<Incoming>) -> Self::Future {
86        let handler = Arc::clone(&self.handler);
87        let config = Arc::clone(&self.config);
88        let request_id = uuid::Uuid::new_v4().to_string();
89
90        Box::pin(async move {
91            let response = process_request(req, handler.as_ref(), &config, &request_id).await;
92            let response = add_common_headers(response, &request_id);
93            Ok(response)
94        })
95    }
96}
97
98/// Process a single CloudWatch Logs HTTP request through the full pipeline.
99async fn process_request<H: LogsHandler>(
100    req: http::Request<Incoming>,
101    handler: &H,
102    config: &LogsHttpConfig,
103    request_id: &str,
104) -> http::Response<LogsResponseBody> {
105    let (parts, incoming) = req.into_parts();
106
107    // 1. Verify POST method (CloudWatch Logs only accepts POST).
108    if parts.method != http::Method::POST {
109        let err = LogsError::validation(format!(
110            "CloudWatch Logs requires POST method, got {}",
111            parts.method,
112        ));
113        return error_to_response(&err, request_id);
114    }
115
116    // 2. Route: extract operation from X-Amz-Target header.
117    let op = match resolve_operation(&parts.headers) {
118        Ok(op) => op,
119        Err(err) => return error_to_response(&err, request_id),
120    };
121
122    // 3. Collect body.
123    let body = match collect_body(incoming).await {
124        Ok(body) => body,
125        Err(err) => return error_to_response(&err, request_id),
126    };
127
128    // 4. Authenticate (if enabled).
129    if !config.skip_signature_validation {
130        if let Some(ref cred_provider) = config.credential_provider {
131            let body_hash = rustack_auth::hash_payload(&body);
132            if let Err(auth_err) =
133                rustack_auth::verify_sigv4(&parts, &body_hash, cred_provider.as_ref())
134            {
135                let err = LogsError::with_message(
136                    rustack_logs_model::error::LogsErrorCode::ValidationException,
137                    auth_err.to_string(),
138                );
139                return error_to_response(&err, request_id);
140            }
141        }
142    }
143
144    // 5. Dispatch to handler.
145    match dispatch_operation(handler, op, body).await {
146        Ok(response) => response,
147        Err(err) => error_to_response(&err, request_id),
148    }
149}
150
151/// Collect the incoming body into a single `Bytes` buffer.
152async fn collect_body(incoming: Incoming) -> Result<Bytes, LogsError> {
153    incoming
154        .collect()
155        .await
156        .map(http_body_util::Collected::to_bytes)
157        .map_err(|e| LogsError::internal_error(format!("Failed to read request body: {e}")))
158}
159
160/// Add common response headers to every CloudWatch Logs response.
161fn add_common_headers(
162    mut response: http::Response<LogsResponseBody>,
163    request_id: &str,
164) -> http::Response<LogsResponseBody> {
165    let headers = response.headers_mut();
166
167    if let Ok(hv) = http::HeaderValue::from_str(request_id) {
168        headers.entry("x-amzn-requestid").or_insert(hv);
169    }
170
171    headers
172        .entry("content-type")
173        .or_insert(http::HeaderValue::from_static(CONTENT_TYPE));
174
175    headers.insert("server", http::HeaderValue::from_static("Rustack"));
176
177    // CORS headers.
178    headers.insert(
179        "access-control-allow-origin",
180        http::HeaderValue::from_static("*"),
181    );
182
183    response
184}