rustack_logs_core/
handler.rs1use std::{future::Future, pin::Pin, sync::Arc};
4
5use bytes::Bytes;
6use rustack_logs_http::{body::LogsResponseBody, dispatch::LogsHandler, response::json_response};
7use rustack_logs_model::{error::LogsError, operations::LogsOperation};
8
9use crate::provider::RustackLogs;
10
11#[derive(Debug)]
13pub struct RustackLogsHandler {
14 provider: Arc<RustackLogs>,
15}
16
17impl RustackLogsHandler {
18 #[must_use]
20 pub fn new(provider: Arc<RustackLogs>) -> Self {
21 Self { provider }
22 }
23}
24
25impl LogsHandler for RustackLogsHandler {
26 fn handle_operation(
27 &self,
28 op: LogsOperation,
29 body: Bytes,
30 ) -> Pin<Box<dyn Future<Output = Result<http::Response<LogsResponseBody>, LogsError>> + Send>>
31 {
32 let provider = Arc::clone(&self.provider);
33 Box::pin(async move { dispatch(&provider, op, &body) })
34 }
35}
36
37#[allow(clippy::too_many_lines)] fn dispatch(
40 provider: &RustackLogs,
41 op: LogsOperation,
42 body: &[u8],
43) -> Result<http::Response<LogsResponseBody>, LogsError> {
44 let request_id = uuid::Uuid::new_v4().to_string();
45
46 match op {
47 LogsOperation::CreateLogGroup => {
49 let input = deserialize(body)?;
50 let output = provider.handle_create_log_group(&input)?;
51 serialize(&output, &request_id)
52 }
53 LogsOperation::DeleteLogGroup => {
54 let input = deserialize(body)?;
55 let output = provider.handle_delete_log_group(&input)?;
56 serialize(&output, &request_id)
57 }
58 LogsOperation::DescribeLogGroups => {
59 let input = deserialize(body)?;
60 let output = provider.handle_describe_log_groups(&input)?;
61 serialize(&output, &request_id)
62 }
63
64 LogsOperation::CreateLogStream => {
66 let input = deserialize(body)?;
67 let output = provider.handle_create_log_stream(&input)?;
68 serialize(&output, &request_id)
69 }
70 LogsOperation::DeleteLogStream => {
71 let input = deserialize(body)?;
72 let output = provider.handle_delete_log_stream(&input)?;
73 serialize(&output, &request_id)
74 }
75 LogsOperation::DescribeLogStreams => {
76 let input = deserialize(body)?;
77 let output = provider.handle_describe_log_streams(&input)?;
78 serialize(&output, &request_id)
79 }
80
81 LogsOperation::PutLogEvents => {
83 let input = deserialize(body)?;
84 let output = provider.handle_put_log_events(&input)?;
85 serialize(&output, &request_id)
86 }
87 LogsOperation::GetLogEvents => {
88 let input = deserialize(body)?;
89 let output = provider.handle_get_log_events(&input)?;
90 serialize(&output, &request_id)
91 }
92 LogsOperation::FilterLogEvents => {
93 let input = deserialize(body)?;
94 let output = provider.handle_filter_log_events(&input)?;
95 serialize(&output, &request_id)
96 }
97
98 LogsOperation::PutRetentionPolicy => {
100 let input = deserialize(body)?;
101 let output = provider.handle_put_retention_policy(&input)?;
102 serialize(&output, &request_id)
103 }
104 LogsOperation::DeleteRetentionPolicy => {
105 let input = deserialize(body)?;
106 let output = provider.handle_delete_retention_policy(&input)?;
107 serialize(&output, &request_id)
108 }
109
110 LogsOperation::TagLogGroup => {
112 let input = deserialize(body)?;
113 let output = provider.handle_tag_log_group(&input)?;
114 serialize(&output, &request_id)
115 }
116 LogsOperation::UntagLogGroup => {
117 let input = deserialize(body)?;
118 let output = provider.handle_untag_log_group(&input)?;
119 serialize(&output, &request_id)
120 }
121 LogsOperation::ListTagsLogGroup => {
122 let input = deserialize(body)?;
123 let output = provider.handle_list_tags_log_group(&input)?;
124 serialize(&output, &request_id)
125 }
126
127 LogsOperation::TagResource => {
129 let input = deserialize(body)?;
130 let output = provider.handle_tag_resource(&input)?;
131 serialize(&output, &request_id)
132 }
133 LogsOperation::UntagResource => {
134 let input = deserialize(body)?;
135 let output = provider.handle_untag_resource(&input)?;
136 serialize(&output, &request_id)
137 }
138 LogsOperation::ListTagsForResource => {
139 let input = deserialize(body)?;
140 let output = provider.handle_list_tags_for_resource(&input)?;
141 serialize(&output, &request_id)
142 }
143
144 LogsOperation::PutResourcePolicy => {
146 let input = deserialize(body)?;
147 let output = provider.handle_put_resource_policy(&input)?;
148 serialize(&output, &request_id)
149 }
150 LogsOperation::DeleteResourcePolicy => {
151 let input = deserialize(body)?;
152 let output = provider.handle_delete_resource_policy(&input)?;
153 serialize(&output, &request_id)
154 }
155 LogsOperation::DescribeResourcePolicies => {
156 let input = deserialize(body)?;
157 let output = provider.handle_describe_resource_policies(&input)?;
158 serialize(&output, &request_id)
159 }
160
161 LogsOperation::PutMetricFilter => {
163 let input = deserialize(body)?;
164 let output = provider.handle_put_metric_filter(&input)?;
165 serialize(&output, &request_id)
166 }
167 LogsOperation::DeleteMetricFilter => {
168 let input = deserialize(body)?;
169 let output = provider.handle_delete_metric_filter(&input)?;
170 serialize(&output, &request_id)
171 }
172 LogsOperation::DescribeMetricFilters => {
173 let input = deserialize(body)?;
174 let output = provider.handle_describe_metric_filters(&input)?;
175 serialize(&output, &request_id)
176 }
177 LogsOperation::TestMetricFilter => {
178 let input = deserialize(body)?;
179 let output = provider.handle_test_metric_filter(&input)?;
180 serialize(&output, &request_id)
181 }
182
183 LogsOperation::PutSubscriptionFilter => {
185 let input = deserialize(body)?;
186 let output = provider.handle_put_subscription_filter(&input)?;
187 serialize(&output, &request_id)
188 }
189 LogsOperation::DeleteSubscriptionFilter => {
190 let input = deserialize(body)?;
191 let output = provider.handle_delete_subscription_filter(&input)?;
192 serialize(&output, &request_id)
193 }
194 LogsOperation::DescribeSubscriptionFilters => {
195 let input = deserialize(body)?;
196 let output = provider.handle_describe_subscription_filters(&input)?;
197 serialize(&output, &request_id)
198 }
199
200 LogsOperation::PutDestination => {
202 let input = deserialize(body)?;
203 let output = provider.handle_put_destination(&input)?;
204 serialize(&output, &request_id)
205 }
206 LogsOperation::PutDestinationPolicy => {
207 let input = deserialize(body)?;
208 let output = provider.handle_put_destination_policy(&input)?;
209 serialize(&output, &request_id)
210 }
211 LogsOperation::DeleteDestination => {
212 let input = deserialize(body)?;
213 let output = provider.handle_delete_destination(&input)?;
214 serialize(&output, &request_id)
215 }
216 LogsOperation::DescribeDestinations => {
217 let input = deserialize(body)?;
218 let output = provider.handle_describe_destinations(&input)?;
219 serialize(&output, &request_id)
220 }
221
222 LogsOperation::StartQuery => {
224 let input = deserialize(body)?;
225 let output = provider.handle_start_query(&input)?;
226 serialize(&output, &request_id)
227 }
228 LogsOperation::StopQuery => {
229 let input = deserialize(body)?;
230 let output = provider.handle_stop_query(&input)?;
231 serialize(&output, &request_id)
232 }
233 LogsOperation::GetQueryResults => {
234 let input = deserialize(body)?;
235 let output = provider.handle_get_query_results(&input)?;
236 serialize(&output, &request_id)
237 }
238 LogsOperation::DescribeQueries => {
239 let input = deserialize(body)?;
240 let output = provider.handle_describe_queries(&input)?;
241 serialize(&output, &request_id)
242 }
243
244 LogsOperation::PutQueryDefinition => {
246 let input = deserialize(body)?;
247 let output = provider.handle_put_query_definition(&input)?;
248 serialize(&output, &request_id)
249 }
250 LogsOperation::DeleteQueryDefinition => {
251 let input = deserialize(body)?;
252 let output = provider.handle_delete_query_definition(&input)?;
253 serialize(&output, &request_id)
254 }
255 LogsOperation::DescribeQueryDefinitions => {
256 let input = deserialize(body)?;
257 let output = provider.handle_describe_query_definitions(&input)?;
258 serialize(&output, &request_id)
259 }
260
261 LogsOperation::AssociateKmsKey => {
263 let input = deserialize(body)?;
264 let output = provider.handle_associate_kms_key(&input)?;
265 serialize(&output, &request_id)
266 }
267 LogsOperation::DisassociateKmsKey => {
268 let input = deserialize(body)?;
269 let output = provider.handle_disassociate_kms_key(&input)?;
270 serialize(&output, &request_id)
271 }
272
273 LogsOperation::CreateExportTask
275 | LogsOperation::CancelExportTask
276 | LogsOperation::DescribeExportTasks => Err(LogsError::not_implemented(op.as_str())),
277 }
278}
279
280fn deserialize<T: serde::de::DeserializeOwned>(body: &[u8]) -> Result<T, LogsError> {
282 serde_json::from_slice(body).map_err(|e| {
283 let msg = e.to_string();
284 if msg.contains("missing field") || msg.contains("unknown variant") {
285 LogsError::validation(format!("1 validation error detected: {msg}"))
286 } else {
287 LogsError::validation(format!("Failed to deserialize request body: {e}"))
288 }
289 })
290}
291
292fn serialize<T: serde::Serialize>(
294 output: &T,
295 request_id: &str,
296) -> Result<http::Response<LogsResponseBody>, LogsError> {
297 let json = serde_json::to_vec(output)
298 .map_err(|e| LogsError::internal_error(format!("Failed to serialize response: {e}")))?;
299 Ok(json_response(json, request_id))
300}