1use http::StatusCode;
2use serde_json::{json, Value};
3
4use crate::validation::*;
5use fakecloud_core::service::{AwsRequest, AwsResponse, AwsServiceError};
6
7use super::LogsService;
8use chrono::Utc;
9
10use crate::state::{AnomalyDetector, LogAnomaly};
11
12impl LogsService {
13 pub(crate) fn create_log_anomaly_detector(
16 &self,
17 req: &AwsRequest,
18 ) -> Result<AwsResponse, AwsServiceError> {
19 let body = req.json_body();
20 validate_optional_string_length("detectorName", body["detectorName"].as_str(), 1, 2048)?;
21 validate_optional_enum_value(
22 "evaluationFrequency",
23 &body["evaluationFrequency"],
24 &[
25 "ONE_MIN",
26 "FIVE_MIN",
27 "TEN_MIN",
28 "FIFTEEN_MIN",
29 "THIRTY_MIN",
30 "ONE_HOUR",
31 ],
32 )?;
33 validate_optional_string_length("filterPattern", body["filterPattern"].as_str(), 0, 1024)?;
34 validate_optional_string_length("kmsKeyId", body["kmsKeyId"].as_str(), 0, 256)?;
35 validate_optional_range_i64(
36 "anomalyVisibilityTime",
37 body["anomalyVisibilityTime"].as_i64(),
38 7,
39 90,
40 )?;
41
42 let log_group_arn_list = body["logGroupArnList"]
43 .as_array()
44 .ok_or_else(|| {
45 AwsServiceError::aws_error(
46 StatusCode::BAD_REQUEST,
47 "InvalidParameterException",
48 "logGroupArnList is required",
49 )
50 })?
51 .iter()
52 .map(|v| {
53 v.as_str().map(|s| s.to_string()).ok_or_else(|| {
54 AwsServiceError::aws_error(
55 StatusCode::BAD_REQUEST,
56 "InvalidParameterException",
57 "logGroupArnList elements must be strings",
58 )
59 })
60 })
61 .collect::<Result<Vec<_>, _>>()?;
62
63 let detector_name = body["detectorName"].as_str().unwrap_or("").to_string();
64 let evaluation_frequency = body["evaluationFrequency"].as_str().map(|s| s.to_string());
65 let filter_pattern = body["filterPattern"].as_str().map(|s| s.to_string());
66 let anomaly_visibility_time = body["anomalyVisibilityTime"].as_i64();
67 let tags = body["tags"]
68 .as_object()
69 .map(|obj| {
70 obj.iter()
71 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
72 .collect()
73 })
74 .unwrap_or_default();
75
76 let now = Utc::now().timestamp_millis();
77 let mut accounts = self.state.write();
78 let state = accounts.get_or_create(&req.account_id);
79 let detector_id = uuid::Uuid::new_v4().to_string();
80 let arn = format!(
81 "arn:aws:logs:{}:{}:anomaly-detector:{}",
82 state.region, state.account_id, detector_id
83 );
84
85 let detector = AnomalyDetector {
86 detector_name: detector_name.clone(),
87 arn: arn.clone(),
88 log_group_arn_list,
89 evaluation_frequency,
90 filter_pattern,
91 anomaly_visibility_time,
92 creation_time: now,
93 last_modified_time: now,
94 enabled: true,
95 tags,
96 };
97
98 state.anomaly_detectors.insert(arn.clone(), detector);
99
100 Ok(AwsResponse::json(
101 StatusCode::OK,
102 serde_json::to_string(&json!({ "anomalyDetectorArn": arn })).unwrap(),
103 ))
104 }
105
106 pub(crate) fn get_log_anomaly_detector(
107 &self,
108 req: &AwsRequest,
109 ) -> Result<AwsResponse, AwsServiceError> {
110 let body = req.json_body();
111 let arn = body["anomalyDetectorArn"].as_str().ok_or_else(|| {
112 AwsServiceError::aws_error(
113 StatusCode::BAD_REQUEST,
114 "InvalidParameterException",
115 "anomalyDetectorArn is required",
116 )
117 })?;
118
119 let accounts = self.state.read();
120 let empty = crate::state::LogsState::new(&req.account_id, &req.region);
121 let state = accounts.get(&req.account_id).unwrap_or(&empty);
122 let detector = state.anomaly_detectors.get(arn).ok_or_else(|| {
123 AwsServiceError::aws_error(
124 StatusCode::BAD_REQUEST,
125 "ResourceNotFoundException",
126 format!("Anomaly detector not found: {arn}"),
127 )
128 })?;
129
130 let mut result = json!({
131 "anomalyDetectorArn": detector.arn,
132 "detectorName": detector.detector_name,
133 "logGroupArnList": detector.log_group_arn_list,
134 "creationTimeStamp": detector.creation_time,
135 "lastModifiedTimeStamp": detector.last_modified_time,
136 "anomalyDetectorStatus": if detector.enabled { "TRAINING" } else { "PAUSED" },
137 });
138 if let Some(ref f) = detector.evaluation_frequency {
139 result["evaluationFrequency"] = json!(f);
140 }
141 if let Some(ref f) = detector.filter_pattern {
142 result["filterPattern"] = json!(f);
143 }
144 if let Some(t) = detector.anomaly_visibility_time {
145 result["anomalyVisibilityTime"] = json!(t);
146 }
147
148 Ok(AwsResponse::json(
149 StatusCode::OK,
150 serde_json::to_string(&result).unwrap(),
151 ))
152 }
153
154 pub(crate) fn delete_log_anomaly_detector(
155 &self,
156 req: &AwsRequest,
157 ) -> Result<AwsResponse, AwsServiceError> {
158 let body = req.json_body();
159 let arn = body["anomalyDetectorArn"].as_str().ok_or_else(|| {
160 AwsServiceError::aws_error(
161 StatusCode::BAD_REQUEST,
162 "InvalidParameterException",
163 "anomalyDetectorArn is required",
164 )
165 })?;
166
167 let mut accounts = self.state.write();
168 let state = accounts.get_or_create(&req.account_id);
169 if state.anomaly_detectors.remove(arn).is_none() {
170 return Err(AwsServiceError::aws_error(
171 StatusCode::BAD_REQUEST,
172 "ResourceNotFoundException",
173 format!("Anomaly detector not found: {arn}"),
174 ));
175 }
176
177 Ok(AwsResponse::json(StatusCode::OK, "{}"))
178 }
179
180 pub(crate) fn list_log_anomaly_detectors(
181 &self,
182 req: &AwsRequest,
183 ) -> Result<AwsResponse, AwsServiceError> {
184 let body = req.json_body();
185 validate_optional_string_length(
186 "filterLogGroupArn",
187 body["filterLogGroupArn"].as_str(),
188 1,
189 2048,
190 )?;
191 validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
192 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 4096)?;
193 let filter_log_group_arn = body["filterLogGroupArn"].as_str();
194 let _limit = body["limit"].as_i64().unwrap_or(50);
195
196 let accounts = self.state.read();
197 let empty = crate::state::LogsState::new(&req.account_id, &req.region);
198 let state = accounts.get(&req.account_id).unwrap_or(&empty);
199 let detectors: Vec<Value> = state
200 .anomaly_detectors
201 .values()
202 .filter(|d| {
203 filter_log_group_arn.is_none_or(|arn| d.log_group_arn_list.iter().any(|a| a == arn))
204 })
205 .map(|d| {
206 let mut obj = json!({
207 "anomalyDetectorArn": d.arn,
208 "detectorName": d.detector_name,
209 "logGroupArnList": d.log_group_arn_list,
210 "creationTimeStamp": d.creation_time,
211 "lastModifiedTimeStamp": d.last_modified_time,
212 "anomalyDetectorStatus": if d.enabled { "TRAINING" } else { "PAUSED" },
213 });
214 if let Some(ref f) = d.evaluation_frequency {
215 obj["evaluationFrequency"] = json!(f);
216 }
217 if let Some(ref f) = d.filter_pattern {
218 obj["filterPattern"] = json!(f);
219 }
220 if let Some(t) = d.anomaly_visibility_time {
221 obj["anomalyVisibilityTime"] = json!(t);
222 }
223 obj
224 })
225 .collect();
226
227 Ok(AwsResponse::json(
228 StatusCode::OK,
229 serde_json::to_string(&json!({ "anomalyDetectors": detectors })).unwrap(),
230 ))
231 }
232
233 pub(crate) fn update_log_anomaly_detector(
234 &self,
235 req: &AwsRequest,
236 ) -> Result<AwsResponse, AwsServiceError> {
237 let body = req.json_body();
238 let arn = body["anomalyDetectorArn"].as_str().ok_or_else(|| {
239 AwsServiceError::aws_error(
240 StatusCode::BAD_REQUEST,
241 "InvalidParameterException",
242 "anomalyDetectorArn is required",
243 )
244 })?;
245 validate_optional_enum_value(
246 "evaluationFrequency",
247 &body["evaluationFrequency"],
248 &[
249 "ONE_MIN",
250 "FIVE_MIN",
251 "TEN_MIN",
252 "FIFTEEN_MIN",
253 "THIRTY_MIN",
254 "ONE_HOUR",
255 ],
256 )?;
257 let enabled = body["enabled"].as_bool().unwrap_or(true);
258
259 let mut accounts = self.state.write();
260 let state = accounts.get_or_create(&req.account_id);
261 let detector = state.anomaly_detectors.get_mut(arn).ok_or_else(|| {
262 AwsServiceError::aws_error(
263 StatusCode::BAD_REQUEST,
264 "ResourceNotFoundException",
265 format!("Anomaly detector not found: {arn}"),
266 )
267 })?;
268
269 detector.enabled = enabled;
270 if let Some(f) = body["evaluationFrequency"].as_str() {
271 detector.evaluation_frequency = Some(f.to_string());
272 }
273 if let Some(f) = body["filterPattern"].as_str() {
274 detector.filter_pattern = Some(f.to_string());
275 }
276 if let Some(t) = body["anomalyVisibilityTime"].as_i64() {
277 detector.anomaly_visibility_time = Some(t);
278 }
279 detector.last_modified_time = Utc::now().timestamp_millis();
280
281 Ok(AwsResponse::json(StatusCode::OK, "{}"))
282 }
283
284 pub(crate) fn list_anomalies(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
285 let body = req.json_body();
286 validate_optional_string_length(
287 "anomalyDetectorArn",
288 body["anomalyDetectorArn"].as_str(),
289 1,
290 2048,
291 )?;
292 validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
293 validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 4096)?;
294 validate_optional_enum_value(
295 "suppressionState",
296 &body["suppressionState"],
297 &["SUPPRESSED", "UNSUPPRESSED"],
298 )?;
299 let detector_filter = body["anomalyDetectorArn"].as_str();
300 let suppression = body["suppressionState"].as_str();
301
302 let accounts = self.state.read();
303 let empty = crate::state::LogsState::new(&req.account_id, &req.region);
304 let state = accounts.get(&req.account_id).unwrap_or(&empty);
305 let anomalies: Vec<Value> = state
306 .anomalies
307 .values()
308 .filter(|a| {
309 detector_filter.is_none_or(|d| a.anomaly_detector_arn == d)
310 && suppression.is_none_or(|s| match s {
311 "SUPPRESSED" => a.suppressed,
312 "UNSUPPRESSED" => !a.suppressed,
313 _ => true,
314 })
315 })
316 .map(|a| {
317 json!({
318 "anomalyId": a.anomaly_id,
319 "anomalyDetectorArn": a.anomaly_detector_arn,
320 "logGroupArnList": a.log_group_arn_list,
321 "patternId": a.pattern_id,
322 "patternString": a.pattern_string,
323 "firstSeen": a.first_seen,
324 "lastSeen": a.last_seen,
325 "priority": a.priority,
326 "state": a.state,
327 "active": !a.suppressed,
328 "suppressed": a.suppressed,
329 })
330 })
331 .collect();
332
333 Ok(AwsResponse::json(
334 StatusCode::OK,
335 serde_json::to_string(&json!({ "anomalies": anomalies })).unwrap(),
336 ))
337 }
338
339 pub(crate) fn update_anomaly(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
340 let body = req.json_body();
341 validate_required("anomalyDetectorArn", &body["anomalyDetectorArn"])?;
342 validate_optional_string_length(
343 "anomalyDetectorArn",
344 body["anomalyDetectorArn"].as_str(),
345 1,
346 2048,
347 )?;
348 validate_optional_string_length("anomalyId", body["anomalyId"].as_str(), 36, 36)?;
349 validate_optional_string_length("patternId", body["patternId"].as_str(), 32, 32)?;
350 validate_optional_enum_value(
351 "suppressionType",
352 &body["suppressionType"],
353 &["LIMITED", "INFINITE"],
354 )?;
355 let anomaly_id = body["anomalyId"].as_str();
356 let suppress = !body["suppressionType"].is_null();
357 if let Some(id) = anomaly_id {
358 let mut accounts = self.state.write();
359 let state = accounts.get_or_create(&req.account_id);
360 if let Some(a) = state.anomalies.get_mut(id) {
361 a.suppressed = suppress;
362 }
363 }
364 Ok(AwsResponse::json(StatusCode::OK, "{}"))
365 }
366
367 pub fn inject_anomaly(
373 &self,
374 account_id: &str,
375 _region: &str,
376 anomaly_detector_arn: String,
377 log_group_arns: Vec<String>,
378 pattern_string: String,
379 priority: Option<String>,
380 ) -> String {
381 let now = Utc::now().timestamp_millis();
382 let anomaly_id = uuid::Uuid::new_v4().to_string();
383 let pattern_id = format!("{:032x}", uuid::Uuid::new_v4().as_u128());
384 let mut accounts = self.state.write();
385 let state = accounts.get_or_create(account_id);
386 state.anomalies.insert(
387 anomaly_id.clone(),
388 LogAnomaly {
389 anomaly_id: anomaly_id.clone(),
390 anomaly_detector_arn,
391 log_group_arn_list: log_group_arns,
392 pattern_id,
393 pattern_string,
394 first_seen: now,
395 last_seen: now,
396 priority: priority.unwrap_or_else(|| "MEDIUM".to_string()),
397 state: "ACTIVE".to_string(),
398 suppressed: false,
399 },
400 );
401 anomaly_id
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use crate::service::test_helpers::*;
408 use serde_json::{json, Value};
409
410 #[test]
413 fn anomaly_detector_lifecycle() {
414 let svc = make_service();
415
416 let req = make_request(
417 "CreateLogAnomalyDetector",
418 json!({
419 "logGroupArnList": ["arn:aws:logs:us-east-1:123456789012:log-group:test:*"],
420 "detectorName": "my-detector",
421 }),
422 );
423 let resp = svc.create_log_anomaly_detector(&req).unwrap();
424 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
425 let arn = body["anomalyDetectorArn"].as_str().unwrap().to_string();
426
427 let req = make_request(
428 "GetLogAnomalyDetector",
429 json!({ "anomalyDetectorArn": &arn }),
430 );
431 let resp = svc.get_log_anomaly_detector(&req).unwrap();
432 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
433 assert_eq!(body["detectorName"], "my-detector");
434
435 let req = make_request("ListLogAnomalyDetectors", json!({}));
436 let resp = svc.list_log_anomaly_detectors(&req).unwrap();
437 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
438 assert_eq!(body["anomalyDetectors"].as_array().unwrap().len(), 1);
439
440 let req = make_request(
441 "UpdateLogAnomalyDetector",
442 json!({ "anomalyDetectorArn": &arn, "enabled": false }),
443 );
444 svc.update_log_anomaly_detector(&req).unwrap();
445
446 let req = make_request(
447 "DeleteLogAnomalyDetector",
448 json!({ "anomalyDetectorArn": &arn }),
449 );
450 svc.delete_log_anomaly_detector(&req).unwrap();
451 }
452
453 #[test]
454 fn list_anomalies_returns_injected_entries() {
455 let svc = make_service();
456 let id = svc.inject_anomaly(
457 "123456789012",
458 "us-east-1",
459 "arn:aws:logs:us-east-1:123456789012:anomaly-detector:abc".to_string(),
460 vec!["arn:aws:logs:us-east-1:123456789012:log-group:test".to_string()],
461 "ERROR pattern".to_string(),
462 None,
463 );
464
465 let req = make_request("ListAnomalies", json!({}));
466 let resp = svc.list_anomalies(&req).unwrap();
467 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
468 let arr = body["anomalies"].as_array().unwrap();
469 assert_eq!(arr.len(), 1);
470 assert_eq!(arr[0]["anomalyId"], id);
471 assert_eq!(arr[0]["patternString"], "ERROR pattern");
472 assert_eq!(arr[0]["suppressed"], false);
473
474 let req = make_request(
475 "UpdateAnomaly",
476 json!({
477 "anomalyDetectorArn": "arn:aws:logs:us-east-1:123456789012:anomaly-detector:abc",
478 "anomalyId": id,
479 "suppressionType": "INFINITE",
480 }),
481 );
482 svc.update_anomaly(&req).unwrap();
483
484 let req = make_request("ListAnomalies", json!({"suppressionState": "SUPPRESSED"}));
485 let resp = svc.list_anomalies(&req).unwrap();
486 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
487 assert_eq!(body["anomalies"].as_array().unwrap().len(), 1);
488 }
489}