Skip to main content

rustack_logs_core/
handler.rs

1//! CloudWatch Logs handler implementation bridging HTTP to business logic.
2
3use std::{future::Future, pin::Pin, sync::Arc};
4
5use bytes::Bytes;
6use rustack_logs_http::{body::LogsResponseBody, dispatch::LogsHandler, response::json_response};
7use rustack_logs_model::{error::LogsError, operations::LogsOperation};
8
9use crate::provider::RustackLogs;
10
11/// Handler that bridges the HTTP layer to the CloudWatch Logs provider.
12#[derive(Debug)]
13pub struct RustackLogsHandler {
14    provider: Arc<RustackLogs>,
15}
16
17impl RustackLogsHandler {
18    /// Create a new handler wrapping a provider.
19    #[must_use]
20    pub fn new(provider: Arc<RustackLogs>) -> Self {
21        Self { provider }
22    }
23}
24
25impl LogsHandler for RustackLogsHandler {
26    fn handle_operation(
27        &self,
28        op: LogsOperation,
29        body: Bytes,
30    ) -> Pin<Box<dyn Future<Output = Result<http::Response<LogsResponseBody>, LogsError>> + Send>>
31    {
32        let provider = Arc::clone(&self.provider);
33        Box::pin(async move { dispatch(&provider, op, &body) })
34    }
35}
36
37/// Dispatch a CloudWatch Logs operation to the appropriate provider method.
38#[allow(clippy::too_many_lines)] // Match arms are simple one-liners; splitting would reduce clarity.
39fn dispatch(
40    provider: &RustackLogs,
41    op: LogsOperation,
42    body: &[u8],
43) -> Result<http::Response<LogsResponseBody>, LogsError> {
44    let request_id = uuid::Uuid::new_v4().to_string();
45
46    match op {
47        // Phase 0: Log Group Management
48        LogsOperation::CreateLogGroup => {
49            let input = deserialize(body)?;
50            let output = provider.handle_create_log_group(&input)?;
51            serialize(&output, &request_id)
52        }
53        LogsOperation::DeleteLogGroup => {
54            let input = deserialize(body)?;
55            let output = provider.handle_delete_log_group(&input)?;
56            serialize(&output, &request_id)
57        }
58        LogsOperation::DescribeLogGroups => {
59            let input = deserialize(body)?;
60            let output = provider.handle_describe_log_groups(&input)?;
61            serialize(&output, &request_id)
62        }
63
64        // Phase 0: Log Stream Management
65        LogsOperation::CreateLogStream => {
66            let input = deserialize(body)?;
67            let output = provider.handle_create_log_stream(&input)?;
68            serialize(&output, &request_id)
69        }
70        LogsOperation::DeleteLogStream => {
71            let input = deserialize(body)?;
72            let output = provider.handle_delete_log_stream(&input)?;
73            serialize(&output, &request_id)
74        }
75        LogsOperation::DescribeLogStreams => {
76            let input = deserialize(body)?;
77            let output = provider.handle_describe_log_streams(&input)?;
78            serialize(&output, &request_id)
79        }
80
81        // Phase 0: Log Events
82        LogsOperation::PutLogEvents => {
83            let input = deserialize(body)?;
84            let output = provider.handle_put_log_events(&input)?;
85            serialize(&output, &request_id)
86        }
87        LogsOperation::GetLogEvents => {
88            let input = deserialize(body)?;
89            let output = provider.handle_get_log_events(&input)?;
90            serialize(&output, &request_id)
91        }
92        LogsOperation::FilterLogEvents => {
93            let input = deserialize(body)?;
94            let output = provider.handle_filter_log_events(&input)?;
95            serialize(&output, &request_id)
96        }
97
98        // Phase 1: Retention Policy
99        LogsOperation::PutRetentionPolicy => {
100            let input = deserialize(body)?;
101            let output = provider.handle_put_retention_policy(&input)?;
102            serialize(&output, &request_id)
103        }
104        LogsOperation::DeleteRetentionPolicy => {
105            let input = deserialize(body)?;
106            let output = provider.handle_delete_retention_policy(&input)?;
107            serialize(&output, &request_id)
108        }
109
110        // Phase 1: Tagging (legacy)
111        LogsOperation::TagLogGroup => {
112            let input = deserialize(body)?;
113            let output = provider.handle_tag_log_group(&input)?;
114            serialize(&output, &request_id)
115        }
116        LogsOperation::UntagLogGroup => {
117            let input = deserialize(body)?;
118            let output = provider.handle_untag_log_group(&input)?;
119            serialize(&output, &request_id)
120        }
121        LogsOperation::ListTagsLogGroup => {
122            let input = deserialize(body)?;
123            let output = provider.handle_list_tags_log_group(&input)?;
124            serialize(&output, &request_id)
125        }
126
127        // Phase 1: Tagging (ARN-based)
128        LogsOperation::TagResource => {
129            let input = deserialize(body)?;
130            let output = provider.handle_tag_resource(&input)?;
131            serialize(&output, &request_id)
132        }
133        LogsOperation::UntagResource => {
134            let input = deserialize(body)?;
135            let output = provider.handle_untag_resource(&input)?;
136            serialize(&output, &request_id)
137        }
138        LogsOperation::ListTagsForResource => {
139            let input = deserialize(body)?;
140            let output = provider.handle_list_tags_for_resource(&input)?;
141            serialize(&output, &request_id)
142        }
143
144        // Phase 1: Resource Policies
145        LogsOperation::PutResourcePolicy => {
146            let input = deserialize(body)?;
147            let output = provider.handle_put_resource_policy(&input)?;
148            serialize(&output, &request_id)
149        }
150        LogsOperation::DeleteResourcePolicy => {
151            let input = deserialize(body)?;
152            let output = provider.handle_delete_resource_policy(&input)?;
153            serialize(&output, &request_id)
154        }
155        LogsOperation::DescribeResourcePolicies => {
156            let input = deserialize(body)?;
157            let output = provider.handle_describe_resource_policies(&input)?;
158            serialize(&output, &request_id)
159        }
160
161        // Phase 2: Metric Filters
162        LogsOperation::PutMetricFilter => {
163            let input = deserialize(body)?;
164            let output = provider.handle_put_metric_filter(&input)?;
165            serialize(&output, &request_id)
166        }
167        LogsOperation::DeleteMetricFilter => {
168            let input = deserialize(body)?;
169            let output = provider.handle_delete_metric_filter(&input)?;
170            serialize(&output, &request_id)
171        }
172        LogsOperation::DescribeMetricFilters => {
173            let input = deserialize(body)?;
174            let output = provider.handle_describe_metric_filters(&input)?;
175            serialize(&output, &request_id)
176        }
177        LogsOperation::TestMetricFilter => {
178            let input = deserialize(body)?;
179            let output = provider.handle_test_metric_filter(&input)?;
180            serialize(&output, &request_id)
181        }
182
183        // Phase 2: Subscription Filters
184        LogsOperation::PutSubscriptionFilter => {
185            let input = deserialize(body)?;
186            let output = provider.handle_put_subscription_filter(&input)?;
187            serialize(&output, &request_id)
188        }
189        LogsOperation::DeleteSubscriptionFilter => {
190            let input = deserialize(body)?;
191            let output = provider.handle_delete_subscription_filter(&input)?;
192            serialize(&output, &request_id)
193        }
194        LogsOperation::DescribeSubscriptionFilters => {
195            let input = deserialize(body)?;
196            let output = provider.handle_describe_subscription_filters(&input)?;
197            serialize(&output, &request_id)
198        }
199
200        // Phase 3: Destinations
201        LogsOperation::PutDestination => {
202            let input = deserialize(body)?;
203            let output = provider.handle_put_destination(&input)?;
204            serialize(&output, &request_id)
205        }
206        LogsOperation::PutDestinationPolicy => {
207            let input = deserialize(body)?;
208            let output = provider.handle_put_destination_policy(&input)?;
209            serialize(&output, &request_id)
210        }
211        LogsOperation::DeleteDestination => {
212            let input = deserialize(body)?;
213            let output = provider.handle_delete_destination(&input)?;
214            serialize(&output, &request_id)
215        }
216        LogsOperation::DescribeDestinations => {
217            let input = deserialize(body)?;
218            let output = provider.handle_describe_destinations(&input)?;
219            serialize(&output, &request_id)
220        }
221
222        // Phase 3: Query Operations (stubs)
223        LogsOperation::StartQuery => {
224            let input = deserialize(body)?;
225            let output = provider.handle_start_query(&input)?;
226            serialize(&output, &request_id)
227        }
228        LogsOperation::StopQuery => {
229            let input = deserialize(body)?;
230            let output = provider.handle_stop_query(&input)?;
231            serialize(&output, &request_id)
232        }
233        LogsOperation::GetQueryResults => {
234            let input = deserialize(body)?;
235            let output = provider.handle_get_query_results(&input)?;
236            serialize(&output, &request_id)
237        }
238        LogsOperation::DescribeQueries => {
239            let input = deserialize(body)?;
240            let output = provider.handle_describe_queries(&input)?;
241            serialize(&output, &request_id)
242        }
243
244        // Phase 3: Query Definitions
245        LogsOperation::PutQueryDefinition => {
246            let input = deserialize(body)?;
247            let output = provider.handle_put_query_definition(&input)?;
248            serialize(&output, &request_id)
249        }
250        LogsOperation::DeleteQueryDefinition => {
251            let input = deserialize(body)?;
252            let output = provider.handle_delete_query_definition(&input)?;
253            serialize(&output, &request_id)
254        }
255        LogsOperation::DescribeQueryDefinitions => {
256            let input = deserialize(body)?;
257            let output = provider.handle_describe_query_definitions(&input)?;
258            serialize(&output, &request_id)
259        }
260
261        // Phase 3: KMS Key Association
262        LogsOperation::AssociateKmsKey => {
263            let input = deserialize(body)?;
264            let output = provider.handle_associate_kms_key(&input)?;
265            serialize(&output, &request_id)
266        }
267        LogsOperation::DisassociateKmsKey => {
268            let input = deserialize(body)?;
269            let output = provider.handle_disassociate_kms_key(&input)?;
270            serialize(&output, &request_id)
271        }
272
273        // Not implemented operations
274        LogsOperation::CreateExportTask
275        | LogsOperation::CancelExportTask
276        | LogsOperation::DescribeExportTasks => Err(LogsError::not_implemented(op.as_str())),
277    }
278}
279
280/// Deserialize a JSON request body into the input type.
281fn deserialize<T: serde::de::DeserializeOwned>(body: &[u8]) -> Result<T, LogsError> {
282    serde_json::from_slice(body).map_err(|e| {
283        let msg = e.to_string();
284        if msg.contains("missing field") || msg.contains("unknown variant") {
285            LogsError::validation(format!("1 validation error detected: {msg}"))
286        } else {
287            LogsError::validation(format!("Failed to deserialize request body: {e}"))
288        }
289    })
290}
291
292/// Serialize an output type into a JSON HTTP response.
293fn serialize<T: serde::Serialize>(
294    output: &T,
295    request_id: &str,
296) -> Result<http::Response<LogsResponseBody>, LogsError> {
297    let json = serde_json::to_vec(output)
298        .map_err(|e| LogsError::internal_error(format!("Failed to serialize response: {e}")))?;
299    Ok(json_response(json, request_id))
300}