Skip to main content

fakecloud_logs/service/
anomaly.rs

1use http::StatusCode;
2use serde_json::{json, Value};
3
4use crate::validation::*;
5use fakecloud_core::service::{AwsRequest, AwsResponse, AwsServiceError};
6
7use super::LogsService;
8use chrono::Utc;
9
10use crate::state::{AnomalyDetector, LogAnomaly};
11
12impl LogsService {
13    // ---- Anomaly Detectors ----
14
15    pub(crate) fn create_log_anomaly_detector(
16        &self,
17        req: &AwsRequest,
18    ) -> Result<AwsResponse, AwsServiceError> {
19        let body = req.json_body();
20        validate_optional_string_length("detectorName", body["detectorName"].as_str(), 1, 2048)?;
21        validate_optional_enum_value(
22            "evaluationFrequency",
23            &body["evaluationFrequency"],
24            &[
25                "ONE_MIN",
26                "FIVE_MIN",
27                "TEN_MIN",
28                "FIFTEEN_MIN",
29                "THIRTY_MIN",
30                "ONE_HOUR",
31            ],
32        )?;
33        validate_optional_string_length("filterPattern", body["filterPattern"].as_str(), 0, 1024)?;
34        validate_optional_string_length("kmsKeyId", body["kmsKeyId"].as_str(), 0, 256)?;
35        validate_optional_range_i64(
36            "anomalyVisibilityTime",
37            body["anomalyVisibilityTime"].as_i64(),
38            7,
39            90,
40        )?;
41
42        let log_group_arn_list = body["logGroupArnList"]
43            .as_array()
44            .ok_or_else(|| {
45                AwsServiceError::aws_error(
46                    StatusCode::BAD_REQUEST,
47                    "InvalidParameterException",
48                    "logGroupArnList is required",
49                )
50            })?
51            .iter()
52            .map(|v| {
53                v.as_str().map(|s| s.to_string()).ok_or_else(|| {
54                    AwsServiceError::aws_error(
55                        StatusCode::BAD_REQUEST,
56                        "InvalidParameterException",
57                        "logGroupArnList elements must be strings",
58                    )
59                })
60            })
61            .collect::<Result<Vec<_>, _>>()?;
62
63        let detector_name = body["detectorName"].as_str().unwrap_or("").to_string();
64        let evaluation_frequency = body["evaluationFrequency"].as_str().map(|s| s.to_string());
65        let filter_pattern = body["filterPattern"].as_str().map(|s| s.to_string());
66        let anomaly_visibility_time = body["anomalyVisibilityTime"].as_i64();
67
68        let now = Utc::now().timestamp_millis();
69        let mut accounts = self.state.write();
70        let state = accounts.get_or_create(&req.account_id);
71        let detector_id = uuid::Uuid::new_v4().to_string();
72        let arn = format!(
73            "arn:aws:logs:{}:{}:anomaly-detector:{}",
74            state.region, state.account_id, detector_id
75        );
76
77        let detector = AnomalyDetector {
78            detector_name: detector_name.clone(),
79            arn: arn.clone(),
80            log_group_arn_list,
81            evaluation_frequency,
82            filter_pattern,
83            anomaly_visibility_time,
84            creation_time: now,
85            last_modified_time: now,
86            enabled: true,
87        };
88
89        state.anomaly_detectors.insert(arn.clone(), detector);
90
91        Ok(AwsResponse::json(
92            StatusCode::OK,
93            serde_json::to_string(&json!({ "anomalyDetectorArn": arn })).unwrap(),
94        ))
95    }
96
97    pub(crate) fn get_log_anomaly_detector(
98        &self,
99        req: &AwsRequest,
100    ) -> Result<AwsResponse, AwsServiceError> {
101        let body = req.json_body();
102        let arn = body["anomalyDetectorArn"].as_str().ok_or_else(|| {
103            AwsServiceError::aws_error(
104                StatusCode::BAD_REQUEST,
105                "InvalidParameterException",
106                "anomalyDetectorArn is required",
107            )
108        })?;
109
110        let accounts = self.state.read();
111        let empty = crate::state::LogsState::new(&req.account_id, &req.region);
112        let state = accounts.get(&req.account_id).unwrap_or(&empty);
113        let detector = state.anomaly_detectors.get(arn).ok_or_else(|| {
114            AwsServiceError::aws_error(
115                StatusCode::BAD_REQUEST,
116                "ResourceNotFoundException",
117                format!("Anomaly detector not found: {arn}"),
118            )
119        })?;
120
121        let mut result = json!({
122            "anomalyDetectorArn": detector.arn,
123            "detectorName": detector.detector_name,
124            "logGroupArnList": detector.log_group_arn_list,
125            "creationTimeStamp": detector.creation_time,
126            "lastModifiedTimeStamp": detector.last_modified_time,
127            "anomalyDetectorStatus": if detector.enabled { "TRAINING" } else { "PAUSED" },
128        });
129        if let Some(ref f) = detector.evaluation_frequency {
130            result["evaluationFrequency"] = json!(f);
131        }
132        if let Some(ref f) = detector.filter_pattern {
133            result["filterPattern"] = json!(f);
134        }
135        if let Some(t) = detector.anomaly_visibility_time {
136            result["anomalyVisibilityTime"] = json!(t);
137        }
138
139        Ok(AwsResponse::json(
140            StatusCode::OK,
141            serde_json::to_string(&result).unwrap(),
142        ))
143    }
144
145    pub(crate) fn delete_log_anomaly_detector(
146        &self,
147        req: &AwsRequest,
148    ) -> Result<AwsResponse, AwsServiceError> {
149        let body = req.json_body();
150        let arn = body["anomalyDetectorArn"].as_str().ok_or_else(|| {
151            AwsServiceError::aws_error(
152                StatusCode::BAD_REQUEST,
153                "InvalidParameterException",
154                "anomalyDetectorArn is required",
155            )
156        })?;
157
158        let mut accounts = self.state.write();
159        let state = accounts.get_or_create(&req.account_id);
160        if state.anomaly_detectors.remove(arn).is_none() {
161            return Err(AwsServiceError::aws_error(
162                StatusCode::BAD_REQUEST,
163                "ResourceNotFoundException",
164                format!("Anomaly detector not found: {arn}"),
165            ));
166        }
167
168        Ok(AwsResponse::json(StatusCode::OK, "{}"))
169    }
170
171    pub(crate) fn list_log_anomaly_detectors(
172        &self,
173        req: &AwsRequest,
174    ) -> Result<AwsResponse, AwsServiceError> {
175        let body = req.json_body();
176        validate_optional_string_length(
177            "filterLogGroupArn",
178            body["filterLogGroupArn"].as_str(),
179            1,
180            2048,
181        )?;
182        validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
183        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 4096)?;
184        let filter_log_group_arn = body["filterLogGroupArn"].as_str();
185        let _limit = body["limit"].as_i64().unwrap_or(50);
186
187        let accounts = self.state.read();
188        let empty = crate::state::LogsState::new(&req.account_id, &req.region);
189        let state = accounts.get(&req.account_id).unwrap_or(&empty);
190        let detectors: Vec<Value> = state
191            .anomaly_detectors
192            .values()
193            .filter(|d| {
194                filter_log_group_arn.is_none_or(|arn| d.log_group_arn_list.iter().any(|a| a == arn))
195            })
196            .map(|d| {
197                let mut obj = json!({
198                    "anomalyDetectorArn": d.arn,
199                    "detectorName": d.detector_name,
200                    "logGroupArnList": d.log_group_arn_list,
201                    "creationTimeStamp": d.creation_time,
202                    "lastModifiedTimeStamp": d.last_modified_time,
203                    "anomalyDetectorStatus": if d.enabled { "TRAINING" } else { "PAUSED" },
204                });
205                if let Some(ref f) = d.evaluation_frequency {
206                    obj["evaluationFrequency"] = json!(f);
207                }
208                if let Some(ref f) = d.filter_pattern {
209                    obj["filterPattern"] = json!(f);
210                }
211                if let Some(t) = d.anomaly_visibility_time {
212                    obj["anomalyVisibilityTime"] = json!(t);
213                }
214                obj
215            })
216            .collect();
217
218        Ok(AwsResponse::json(
219            StatusCode::OK,
220            serde_json::to_string(&json!({ "anomalyDetectors": detectors })).unwrap(),
221        ))
222    }
223
224    pub(crate) fn update_log_anomaly_detector(
225        &self,
226        req: &AwsRequest,
227    ) -> Result<AwsResponse, AwsServiceError> {
228        let body = req.json_body();
229        let arn = body["anomalyDetectorArn"].as_str().ok_or_else(|| {
230            AwsServiceError::aws_error(
231                StatusCode::BAD_REQUEST,
232                "InvalidParameterException",
233                "anomalyDetectorArn is required",
234            )
235        })?;
236        validate_optional_enum_value(
237            "evaluationFrequency",
238            &body["evaluationFrequency"],
239            &[
240                "ONE_MIN",
241                "FIVE_MIN",
242                "TEN_MIN",
243                "FIFTEEN_MIN",
244                "THIRTY_MIN",
245                "ONE_HOUR",
246            ],
247        )?;
248        let enabled = body["enabled"].as_bool().unwrap_or(true);
249
250        let mut accounts = self.state.write();
251        let state = accounts.get_or_create(&req.account_id);
252        let detector = state.anomaly_detectors.get_mut(arn).ok_or_else(|| {
253            AwsServiceError::aws_error(
254                StatusCode::BAD_REQUEST,
255                "ResourceNotFoundException",
256                format!("Anomaly detector not found: {arn}"),
257            )
258        })?;
259
260        detector.enabled = enabled;
261        if let Some(f) = body["evaluationFrequency"].as_str() {
262            detector.evaluation_frequency = Some(f.to_string());
263        }
264        if let Some(f) = body["filterPattern"].as_str() {
265            detector.filter_pattern = Some(f.to_string());
266        }
267        if let Some(t) = body["anomalyVisibilityTime"].as_i64() {
268            detector.anomaly_visibility_time = Some(t);
269        }
270        detector.last_modified_time = Utc::now().timestamp_millis();
271
272        Ok(AwsResponse::json(StatusCode::OK, "{}"))
273    }
274
275    pub(crate) fn list_anomalies(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
276        let body = req.json_body();
277        validate_optional_string_length(
278            "anomalyDetectorArn",
279            body["anomalyDetectorArn"].as_str(),
280            1,
281            2048,
282        )?;
283        validate_optional_range_i64("limit", body["limit"].as_i64(), 1, 50)?;
284        validate_optional_string_length("nextToken", body["nextToken"].as_str(), 1, 4096)?;
285        validate_optional_enum_value(
286            "suppressionState",
287            &body["suppressionState"],
288            &["SUPPRESSED", "UNSUPPRESSED"],
289        )?;
290        let detector_filter = body["anomalyDetectorArn"].as_str();
291        let suppression = body["suppressionState"].as_str();
292
293        let accounts = self.state.read();
294        let empty = crate::state::LogsState::new(&req.account_id, &req.region);
295        let state = accounts.get(&req.account_id).unwrap_or(&empty);
296        let anomalies: Vec<Value> = state
297            .anomalies
298            .values()
299            .filter(|a| {
300                detector_filter.is_none_or(|d| a.anomaly_detector_arn == d)
301                    && suppression.is_none_or(|s| match s {
302                        "SUPPRESSED" => a.suppressed,
303                        "UNSUPPRESSED" => !a.suppressed,
304                        _ => true,
305                    })
306            })
307            .map(|a| {
308                json!({
309                    "anomalyId": a.anomaly_id,
310                    "anomalyDetectorArn": a.anomaly_detector_arn,
311                    "logGroupArnList": a.log_group_arn_list,
312                    "patternId": a.pattern_id,
313                    "patternString": a.pattern_string,
314                    "firstSeen": a.first_seen,
315                    "lastSeen": a.last_seen,
316                    "priority": a.priority,
317                    "state": a.state,
318                    "active": !a.suppressed,
319                    "suppressed": a.suppressed,
320                })
321            })
322            .collect();
323
324        Ok(AwsResponse::json(
325            StatusCode::OK,
326            serde_json::to_string(&json!({ "anomalies": anomalies })).unwrap(),
327        ))
328    }
329
330    pub(crate) fn update_anomaly(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
331        let body = req.json_body();
332        validate_required("anomalyDetectorArn", &body["anomalyDetectorArn"])?;
333        validate_optional_string_length(
334            "anomalyDetectorArn",
335            body["anomalyDetectorArn"].as_str(),
336            1,
337            2048,
338        )?;
339        validate_optional_string_length("anomalyId", body["anomalyId"].as_str(), 36, 36)?;
340        validate_optional_string_length("patternId", body["patternId"].as_str(), 32, 32)?;
341        validate_optional_enum_value(
342            "suppressionType",
343            &body["suppressionType"],
344            &["LIMITED", "INFINITE"],
345        )?;
346        let anomaly_id = body["anomalyId"].as_str();
347        let suppress = !body["suppressionType"].is_null();
348        if let Some(id) = anomaly_id {
349            let mut accounts = self.state.write();
350            let state = accounts.get_or_create(&req.account_id);
351            if let Some(a) = state.anomalies.get_mut(id) {
352                a.suppressed = suppress;
353            }
354        }
355        Ok(AwsResponse::json(StatusCode::OK, "{}"))
356    }
357
358    // -- Import tasks --
359
360    /// Admin: inject a synthetic anomaly so tests can exercise
361    /// ListAnomalies / UpdateAnomaly without running real detection.
362    /// Called from the `/_fakecloud/logs/anomalies/inject` endpoint.
363    pub fn inject_anomaly(
364        &self,
365        account_id: &str,
366        _region: &str,
367        anomaly_detector_arn: String,
368        log_group_arns: Vec<String>,
369        pattern_string: String,
370        priority: Option<String>,
371    ) -> String {
372        let now = Utc::now().timestamp_millis();
373        let anomaly_id = uuid::Uuid::new_v4().to_string();
374        let pattern_id = format!("{:032x}", uuid::Uuid::new_v4().as_u128());
375        let mut accounts = self.state.write();
376        let state = accounts.get_or_create(account_id);
377        state.anomalies.insert(
378            anomaly_id.clone(),
379            LogAnomaly {
380                anomaly_id: anomaly_id.clone(),
381                anomaly_detector_arn,
382                log_group_arn_list: log_group_arns,
383                pattern_id,
384                pattern_string,
385                first_seen: now,
386                last_seen: now,
387                priority: priority.unwrap_or_else(|| "MEDIUM".to_string()),
388                state: "ACTIVE".to_string(),
389                suppressed: false,
390            },
391        );
392        anomaly_id
393    }
394}
395
396#[cfg(test)]
397mod tests {
398    use crate::service::test_helpers::*;
399    use serde_json::{json, Value};
400
401    // ---- Anomaly detectors ----
402
403    #[test]
404    fn anomaly_detector_lifecycle() {
405        let svc = make_service();
406
407        let req = make_request(
408            "CreateLogAnomalyDetector",
409            json!({
410                "logGroupArnList": ["arn:aws:logs:us-east-1:123456789012:log-group:test:*"],
411                "detectorName": "my-detector",
412            }),
413        );
414        let resp = svc.create_log_anomaly_detector(&req).unwrap();
415        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
416        let arn = body["anomalyDetectorArn"].as_str().unwrap().to_string();
417
418        let req = make_request(
419            "GetLogAnomalyDetector",
420            json!({ "anomalyDetectorArn": &arn }),
421        );
422        let resp = svc.get_log_anomaly_detector(&req).unwrap();
423        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
424        assert_eq!(body["detectorName"], "my-detector");
425
426        let req = make_request("ListLogAnomalyDetectors", json!({}));
427        let resp = svc.list_log_anomaly_detectors(&req).unwrap();
428        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
429        assert_eq!(body["anomalyDetectors"].as_array().unwrap().len(), 1);
430
431        let req = make_request(
432            "UpdateLogAnomalyDetector",
433            json!({ "anomalyDetectorArn": &arn, "enabled": false }),
434        );
435        svc.update_log_anomaly_detector(&req).unwrap();
436
437        let req = make_request(
438            "DeleteLogAnomalyDetector",
439            json!({ "anomalyDetectorArn": &arn }),
440        );
441        svc.delete_log_anomaly_detector(&req).unwrap();
442    }
443
444    #[test]
445    fn list_anomalies_returns_injected_entries() {
446        let svc = make_service();
447        let id = svc.inject_anomaly(
448            "123456789012",
449            "us-east-1",
450            "arn:aws:logs:us-east-1:123456789012:anomaly-detector:abc".to_string(),
451            vec!["arn:aws:logs:us-east-1:123456789012:log-group:test".to_string()],
452            "ERROR pattern".to_string(),
453            None,
454        );
455
456        let req = make_request("ListAnomalies", json!({}));
457        let resp = svc.list_anomalies(&req).unwrap();
458        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
459        let arr = body["anomalies"].as_array().unwrap();
460        assert_eq!(arr.len(), 1);
461        assert_eq!(arr[0]["anomalyId"], id);
462        assert_eq!(arr[0]["patternString"], "ERROR pattern");
463        assert_eq!(arr[0]["suppressed"], false);
464
465        let req = make_request(
466            "UpdateAnomaly",
467            json!({
468                "anomalyDetectorArn": "arn:aws:logs:us-east-1:123456789012:anomaly-detector:abc",
469                "anomalyId": id,
470                "suppressionType": "INFINITE",
471            }),
472        );
473        svc.update_anomaly(&req).unwrap();
474
475        let req = make_request("ListAnomalies", json!({"suppressionState": "SUPPRESSED"}));
476        let resp = svc.list_anomalies(&req).unwrap();
477        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
478        assert_eq!(body["anomalies"].as_array().unwrap().len(), 1);
479    }
480}