Skip to main content

fakecloud_eventbridge/
helpers.rs

1use super::*;
2
3/// Validate a single `PutEvents` entry's required fields (`Source`,
4/// `DetailType`, `Detail`) and that `Detail` is a well-formed JSON
5/// object. Returns the JSON error body AWS surfaces in the matching
6/// `Entries[]` slot on failure.
7pub(crate) fn validate_put_events_entry(
8    source: &str,
9    detail_type: &str,
10    detail: &str,
11) -> Result<(), Value> {
12    if source.is_empty() {
13        return Err(json!({
14            "ErrorCode": "InvalidArgument",
15            "ErrorMessage": "Parameter Source is not valid. Reason: Source is a required argument.",
16        }));
17    }
18    if detail_type.is_empty() {
19        return Err(json!({
20            "ErrorCode": "InvalidArgument",
21            "ErrorMessage": "Parameter DetailType is not valid. Reason: DetailType is a required argument.",
22        }));
23    }
24    if detail.is_empty() {
25        return Err(json!({
26            "ErrorCode": "InvalidArgument",
27            "ErrorMessage": "Parameter Detail is not valid. Reason: Detail is a required argument.",
28        }));
29    }
30    if serde_json::from_str::<Value>(detail).is_err() {
31        return Err(json!({
32            "ErrorCode": "MalformedDetail",
33            "ErrorMessage": "Detail is malformed.",
34        }));
35    }
36    Ok(())
37}
38
39/// Parse an entry's `Time` field, tolerating the three formats AWS
40/// accepts (RFC 3339 string, fractional seconds as a float, integer
41/// seconds). Falls back to "now" if the field is absent or
42/// unparseable, which matches the real service.
43pub(crate) fn parse_put_events_time(raw: &Value) -> DateTime<Utc> {
44    if let Some(s) = raw.as_str() {
45        return DateTime::parse_from_rfc3339(s)
46            .map(|dt| dt.with_timezone(&Utc))
47            .unwrap_or_else(|_| Utc::now());
48    }
49    if let Some(ts) = raw.as_f64() {
50        return DateTime::from_timestamp(ts as i64, ((ts.fract()) * 1_000_000_000.0) as u32)
51            .unwrap_or_else(Utc::now);
52    }
53    if let Some(ts) = raw.as_i64() {
54        return DateTime::from_timestamp(ts, 0).unwrap_or_else(Utc::now);
55    }
56    Utc::now()
57}
58
59/// Actions that mutate EventBridge state.
60pub(crate) fn is_mutating_action(action: &str) -> bool {
61    matches!(
62        action,
63        "CreateEventBus"
64            | "DeleteEventBus"
65            | "UpdateEventBus"
66            | "PutRule"
67            | "DeleteRule"
68            | "EnableRule"
69            | "DisableRule"
70            | "PutTargets"
71            | "RemoveTargets"
72            | "PutEvents"
73            | "PutPermission"
74            | "RemovePermission"
75            | "TagResource"
76            | "UntagResource"
77            | "CreateArchive"
78            | "UpdateArchive"
79            | "DeleteArchive"
80            | "CreateConnection"
81            | "UpdateConnection"
82            | "DeleteConnection"
83            | "DeauthorizeConnection"
84            | "CreateApiDestination"
85            | "UpdateApiDestination"
86            | "DeleteApiDestination"
87            | "StartReplay"
88            | "CancelReplay"
89            | "CreatePartnerEventSource"
90            | "DeletePartnerEventSource"
91            | "ActivateEventSource"
92            | "DeactivateEventSource"
93            | "PutPartnerEvents"
94            | "CreateEndpoint"
95            | "DeleteEndpoint"
96            | "UpdateEndpoint"
97    )
98}
99
100pub(crate) fn parse_tags(body: &Value) -> BTreeMap<String, String> {
101    let mut tags = BTreeMap::new();
102    if let Some(arr) = body["Tags"].as_array() {
103        for tag in arr {
104            if let (Some(key), Some(val)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
105                tags.insert(key.to_string(), val.to_string());
106            }
107        }
108    }
109    tags
110}
111
112pub fn parse_target(target: &Value) -> EventTarget {
113    EventTarget {
114        id: target["Id"].as_str().unwrap_or("").to_string(),
115        arn: target["Arn"].as_str().unwrap_or("").to_string(),
116        input: target["Input"].as_str().map(|s| s.to_string()),
117        input_path: target["InputPath"].as_str().map(|s| s.to_string()),
118        input_transformer: target.get("InputTransformer").cloned(),
119        sqs_parameters: target.get("SqsParameters").cloned(),
120        role_arn: target["RoleArn"].as_str().map(|s| s.to_string()),
121        dead_letter_config: target.get("DeadLetterConfig").cloned(),
122        retry_policy: target.get("RetryPolicy").cloned(),
123        ecs_parameters: target.get("EcsParameters").cloned(),
124        batch_parameters: target.get("BatchParameters").cloned(),
125        kinesis_parameters: target.get("KinesisParameters").cloned(),
126        redshift_data_parameters: target.get("RedshiftDataParameters").cloned(),
127        http_parameters: target.get("HttpParameters").cloned(),
128        sage_maker_pipeline_parameters: target.get("SageMakerPipelineParameters").cloned(),
129        app_sync_parameters: target.get("AppSyncParameters").cloned(),
130        run_command_parameters: target.get("RunCommandParameters").cloned(),
131    }
132}
133
134pub(crate) fn target_to_json(t: &EventTarget) -> Value {
135    let mut obj = json!({ "Id": t.id, "Arn": t.arn });
136    if let Some(ref input) = t.input {
137        obj["Input"] = json!(input);
138    }
139    if let Some(ref input_path) = t.input_path {
140        obj["InputPath"] = json!(input_path);
141    }
142    if let Some(ref it) = t.input_transformer {
143        obj["InputTransformer"] = it.clone();
144    }
145    if let Some(ref sp) = t.sqs_parameters {
146        obj["SqsParameters"] = sp.clone();
147    }
148    if let Some(ref ra) = t.role_arn {
149        obj["RoleArn"] = json!(ra);
150    }
151    if let Some(ref dlc) = t.dead_letter_config {
152        obj["DeadLetterConfig"] = dlc.clone();
153    }
154    if let Some(ref rp) = t.retry_policy {
155        obj["RetryPolicy"] = rp.clone();
156    }
157    if let Some(ref p) = t.ecs_parameters {
158        obj["EcsParameters"] = p.clone();
159    }
160    if let Some(ref p) = t.batch_parameters {
161        obj["BatchParameters"] = p.clone();
162    }
163    if let Some(ref p) = t.kinesis_parameters {
164        obj["KinesisParameters"] = p.clone();
165    }
166    if let Some(ref p) = t.redshift_data_parameters {
167        obj["RedshiftDataParameters"] = p.clone();
168    }
169    if let Some(ref p) = t.http_parameters {
170        obj["HttpParameters"] = p.clone();
171    }
172    if let Some(ref p) = t.sage_maker_pipeline_parameters {
173        obj["SageMakerPipelineParameters"] = p.clone();
174    }
175    if let Some(ref p) = t.app_sync_parameters {
176        obj["AppSyncParameters"] = p.clone();
177    }
178    if let Some(ref p) = t.run_command_parameters {
179        obj["RunCommandParameters"] = p.clone();
180    }
181    obj
182}
183
184pub(crate) fn find_tags_mut<'a>(
185    state: &'a mut crate::state::EventBridgeState,
186    arn: &str,
187) -> Result<&'a mut BTreeMap<String, String>, AwsServiceError> {
188    // Check buses
189    for bus in state.buses.values_mut() {
190        if bus.arn == arn {
191            return Ok(&mut bus.tags);
192        }
193    }
194    // Check rules
195    for rule in state.rules.values_mut() {
196        if rule.arn == arn {
197            return Ok(&mut rule.tags);
198        }
199    }
200
201    // Parse ARN to give better error messages
202    let error_msg = if arn.contains(":rule/") {
203        // Extract rule name and bus from ARN
204        let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
205        if let Some(rule_path) = parts.first() {
206            if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
207                format!("Rule {rule_name} does not exist on EventBus {bus}.")
208            } else {
209                format!("Rule {} does not exist on EventBus default.", rule_path)
210            }
211        } else {
212            format!("Resource {arn} not found.")
213        }
214    } else {
215        format!("Resource {arn} not found.")
216    };
217
218    Err(AwsServiceError::aws_error(
219        StatusCode::BAD_REQUEST,
220        "ResourceNotFoundException",
221        error_msg,
222    ))
223}
224
225pub(crate) fn find_tags<'a>(
226    state: &'a crate::state::EventBridgeState,
227    arn: &str,
228) -> Result<&'a BTreeMap<String, String>, AwsServiceError> {
229    for bus in state.buses.values() {
230        if bus.arn == arn {
231            return Ok(&bus.tags);
232        }
233    }
234    for rule in state.rules.values() {
235        if rule.arn == arn {
236            return Ok(&rule.tags);
237        }
238    }
239
240    let error_msg = if arn.contains(":rule/") {
241        let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
242        if let Some(rule_path) = parts.first() {
243            if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
244                format!("Rule {rule_name} does not exist on EventBus {bus}.")
245            } else {
246                format!("Rule {} does not exist on EventBus default.", rule_path)
247            }
248        } else {
249            format!("Resource {arn} not found.")
250        }
251    } else {
252        format!("Resource {arn} not found.")
253    };
254
255    Err(AwsServiceError::aws_error(
256        StatusCode::BAD_REQUEST,
257        "ResourceNotFoundException",
258        error_msg,
259    ))
260}
261
262pub(crate) fn validate_event_pattern(pattern: &str) -> Result<(), AwsServiceError> {
263    let parsed: Value = serde_json::from_str(pattern).map_err(|_| {
264        AwsServiceError::aws_error(
265            StatusCode::BAD_REQUEST,
266            "InvalidEventPatternException",
267            "Event pattern is not valid. Reason: Invalid JSON",
268        )
269    })?;
270
271    validate_pattern_values(&parsed, "")?;
272    Ok(())
273}
274
275pub(crate) fn validate_pattern_values(value: &Value, path: &str) -> Result<(), AwsServiceError> {
276    match value {
277        Value::Object(obj) => {
278            for (key, val) in obj {
279                let new_path = if path.is_empty() {
280                    key.clone()
281                } else {
282                    format!("{path}.{key}")
283                };
284                match val {
285                    Value::Object(_) => validate_pattern_values(val, &new_path)?,
286                    Value::Array(_) => {} // Arrays are fine at leaf level
287                    _ => {
288                        return Err(AwsServiceError::aws_error(
289                            StatusCode::BAD_REQUEST,
290                            "InvalidEventPatternException",
291                            format!(
292                                "Event pattern is not valid. Reason: '{}' must be an object or an array",
293                                key
294                            ),
295                        ));
296                    }
297                }
298            }
299            Ok(())
300        }
301        _ => Ok(()),
302    }
303}
304
305pub(crate) fn build_auth_params_response(auth_type: &str, params: &Value) -> Value {
306    let mut resp = match auth_type {
307        "API_KEY" => {
308            let mut resp = json!({});
309            if let Some(api_key) = params.get("ApiKeyAuthParameters") {
310                resp["ApiKeyAuthParameters"] = json!({
311                    "ApiKeyName": api_key["ApiKeyName"],
312                });
313            }
314            resp
315        }
316        "BASIC" => {
317            let mut resp = json!({});
318            if let Some(basic) = params.get("BasicAuthParameters") {
319                resp["BasicAuthParameters"] = json!({
320                    "Username": basic["Username"],
321                });
322            }
323            resp
324        }
325        "OAUTH_CLIENT_CREDENTIALS" => {
326            let mut resp = json!({});
327            if let Some(oauth) = params.get("OAuthParameters") {
328                resp["OAuthParameters"] = json!({
329                    "AuthorizationEndpoint": oauth["AuthorizationEndpoint"],
330                    "HttpMethod": oauth["HttpMethod"],
331                    "ClientParameters": {
332                        "ClientID": oauth.get("ClientParameters").and_then(|c| c.get("ClientID")),
333                    },
334                });
335            }
336            resp
337        }
338        _ => return params.clone(),
339    };
340
341    // Echo the connection-level InvocationHttpParameters (additional custom
342    // headers / query-string / body parameters merged into every invocation).
343    // Previously dropped, which made DescribeConnection lose everything the
344    // caller configured beyond the auth block. Secret values are hidden on
345    // describe (matching AWS), but keys + IsValueSecret flags round-trip.
346    if let Some(inv) = params.get("InvocationHttpParameters") {
347        resp["InvocationHttpParameters"] = sanitize_invocation_http_params(inv);
348    }
349    resp
350}
351
352/// Redact secret values from a connection's `InvocationHttpParameters` for
353/// read responses. Each parameter keeps its `Key` and `IsValueSecret` flag;
354/// the `Value` is only echoed when the parameter is explicitly non-secret.
355fn sanitize_invocation_http_params(inv: &Value) -> Value {
356    let mut out = json!({});
357    for field in [
358        "HeaderParameters",
359        "QueryStringParameters",
360        "BodyParameters",
361    ] {
362        if let Some(arr) = inv.get(field).and_then(|v| v.as_array()) {
363            let sanitized: Vec<Value> = arr
364                .iter()
365                .map(|p| {
366                    let is_secret = p
367                        .get("IsValueSecret")
368                        .and_then(|v| v.as_bool())
369                        .unwrap_or(true);
370                    let mut entry = json!({
371                        "Key": p.get("Key").cloned().unwrap_or(Value::Null),
372                        "IsValueSecret": is_secret,
373                    });
374                    if !is_secret {
375                        if let Some(val) = p.get("Value") {
376                            entry["Value"] = val.clone();
377                        }
378                    }
379                    entry
380                })
381                .collect();
382            out[field] = json!(sanitized);
383        }
384    }
385    out
386}
387
388/// Match an event against an EventBridge event pattern.
389pub(crate) fn matches_pattern(
390    pattern_json: Option<&str>,
391    source: &str,
392    detail_type: &str,
393    detail: &str,
394    account: &str,
395    region: &str,
396    resources: &[String],
397) -> bool {
398    let pattern_json = match pattern_json {
399        Some(p) => p,
400        None => return true,
401    };
402
403    let pattern: Value = match serde_json::from_str(pattern_json) {
404        Ok(v) => v,
405        Err(_) => return false,
406    };
407
408    if !pattern.is_object() {
409        return false;
410    }
411
412    let detail_value: Value = serde_json::from_str(detail).unwrap_or(json!({}));
413    let event = json!({
414        "source": source,
415        "detail-type": detail_type,
416        "detail": detail_value,
417        "account": account,
418        "region": region,
419        "resources": resources,
420    });
421
422    matches_value(&pattern, &event)
423}
424
425pub(crate) fn matches_value(pattern: &Value, event_value: &Value) -> bool {
426    match pattern {
427        Value::Object(obj) => {
428            // `$or` is a sibling-level alternation: any alternative pattern
429            // matched against this same event level passes the whole object.
430            if let Some(Value::Array(alternatives)) = obj.get("$or") {
431                return alternatives
432                    .iter()
433                    .any(|alt| matches_value(alt, event_value));
434            }
435            for (key, sub_pattern) in obj {
436                if key == "$or" {
437                    continue;
438                }
439                let sub_value = &event_value[key];
440                if !matches_value(sub_pattern, sub_value) {
441                    return false;
442                }
443            }
444            true
445        }
446        Value::Array(arr) => arr.iter().any(|elem| matches_single(elem, event_value)),
447        _ => false,
448    }
449}
450
451pub(crate) fn matches_single(pattern_elem: &Value, event_value: &Value) -> bool {
452    match pattern_elem {
453        Value::Object(obj) => {
454            if let Some(prefix_val) = obj.get("prefix") {
455                if let (Some(prefix), Some(actual)) = (prefix_val.as_str(), event_value.as_str()) {
456                    return actual.starts_with(prefix);
457                }
458                return false;
459            }
460            if let Some(suffix_val) = obj.get("suffix") {
461                if let (Some(suffix), Some(actual)) = (suffix_val.as_str(), event_value.as_str()) {
462                    return actual.ends_with(suffix);
463                }
464                return false;
465            }
466            if let Some(eqic_val) = obj.get("equals-ignore-case") {
467                if let (Some(expected), Some(actual)) = (eqic_val.as_str(), event_value.as_str()) {
468                    return expected.eq_ignore_ascii_case(actual);
469                }
470                return false;
471            }
472            if let Some(cidr_val) = obj.get("cidr") {
473                if let (Some(cidr), Some(actual)) = (cidr_val.as_str(), event_value.as_str()) {
474                    return cidr_matches(cidr, actual);
475                }
476                return false;
477            }
478            if let Some(wild_val) = obj.get("wildcard") {
479                if let (Some(pattern), Some(actual)) = (wild_val.as_str(), event_value.as_str()) {
480                    return wildcard_matches(pattern, actual);
481                }
482                return false;
483            }
484            if let Some(exists_val) = obj.get("exists") {
485                let should_exist = exists_val.as_bool().unwrap_or(true);
486                let does_exist = !event_value.is_null();
487                return should_exist == does_exist;
488            }
489            if let Some(anything_but_val) = obj.get("anything-but") {
490                return match anything_but_val {
491                    Value::String(s) => event_value.as_str() != Some(s.as_str()),
492                    Value::Array(arr) => !arr.iter().any(|v| values_equal(v, event_value)),
493                    Value::Number(_) => event_value != anything_but_val,
494                    // Nested-matcher negation, e.g.
495                    // {"anything-but": {"prefix": "x"}} matches when the
496                    // field does NOT start with "x". Previously the object
497                    // form always matched, so excluded events were still
498                    // delivered (bug-audit 2026-05-28, 1.14).
499                    Value::Object(nested) => {
500                        let fv = event_value.as_str();
501                        if let Some(p) = nested.get("prefix").and_then(|v| v.as_str()) {
502                            !fv.is_some_and(|s| s.starts_with(p))
503                        } else if let Some(suf) = nested.get("suffix").and_then(|v| v.as_str()) {
504                            !fv.is_some_and(|s| s.ends_with(suf))
505                        } else if let Some(eic) =
506                            nested.get("equals-ignore-case").and_then(|v| v.as_str())
507                        {
508                            !fv.is_some_and(|s| s.eq_ignore_ascii_case(eic))
509                        } else {
510                            // Unknown nested matcher: default to matching.
511                            true
512                        }
513                    }
514                    _ => true,
515                };
516            }
517            if let Some(numeric_val) = obj.get("numeric") {
518                return matches_numeric(numeric_val, event_value);
519            }
520            false
521        }
522        _ => values_equal(pattern_elem, event_value),
523    }
524}
525
526/// `wildcard` matcher: `*` matches any run of characters (including empty);
527/// `\*` is a literal asterisk.
528pub(crate) fn wildcard_matches(pattern: &str, actual: &str) -> bool {
529    let mut segments: Vec<String> = Vec::new();
530    let mut current = String::new();
531    let mut chars = pattern.chars();
532    while let Some(c) = chars.next() {
533        if c == '\\' {
534            if let Some(next) = chars.next() {
535                current.push(next);
536            }
537        } else if c == '*' {
538            segments.push(std::mem::take(&mut current));
539        } else {
540            current.push(c);
541        }
542    }
543    segments.push(current);
544
545    if segments.len() == 1 {
546        return segments[0] == actual;
547    }
548
549    let mut pos = 0;
550    let first = &segments[0];
551    if !actual[pos..].starts_with(first.as_str()) {
552        return false;
553    }
554    pos += first.len();
555
556    let last_idx = segments.len() - 1;
557    for (i, seg) in segments.iter().enumerate().skip(1) {
558        if i == last_idx {
559            // The trailing segment must match the tail.
560            if !actual[pos..].ends_with(seg.as_str()) {
561                return false;
562            }
563            return actual.len().saturating_sub(pos) >= seg.len();
564        }
565        match actual[pos..].find(seg.as_str()) {
566            Some(idx) => pos += idx + seg.len(),
567            None => return false,
568        }
569    }
570    true
571}
572
573/// IPv4 CIDR membership test for the `cidr` filter.
574pub(crate) fn cidr_matches(cidr: &str, actual: &str) -> bool {
575    let (net_str, prefix_str) = match cidr.split_once('/') {
576        Some(parts) => parts,
577        None => return false,
578    };
579    let prefix: u32 = match prefix_str.parse() {
580        Ok(p) if p <= 32 => p,
581        _ => return false,
582    };
583    let net = match parse_ipv4(net_str) {
584        Some(n) => n,
585        None => return false,
586    };
587    let value = match parse_ipv4(actual) {
588        Some(v) => v,
589        None => return false,
590    };
591    if prefix == 0 {
592        return true;
593    }
594    let mask = u32::MAX << (32 - prefix);
595    (net & mask) == (value & mask)
596}
597
598fn parse_ipv4(s: &str) -> Option<u32> {
599    let mut parts = s.split('.');
600    let mut result: u32 = 0;
601    for _ in 0..4 {
602        let octet: u32 = parts.next()?.parse().ok()?;
603        if octet > 255 {
604            return None;
605        }
606        result = (result << 8) | octet;
607    }
608    if parts.next().is_some() {
609        return None;
610    }
611    Some(result)
612}
613
614/// For each archive on `event_bus_name` whose event pattern matches the
615/// event, append a clone of it to the archive's stored events and bump
616/// the archive's counters.
617#[allow(clippy::too_many_arguments)]
618pub(crate) fn archive_matching_event(
619    state: &mut crate::state::EventBridgeState,
620    event: &PutEvent,
621    event_bus_name: &str,
622    source: &str,
623    detail_type: &str,
624    detail: &str,
625    account_id: &str,
626    region: &str,
627    resources: &[String],
628) {
629    let archive_keys: Vec<String> = state.archives.keys().cloned().collect();
630    for akey in archive_keys {
631        let (archive_bus, archive_pattern, archive_enabled) = {
632            let a = &state.archives[&akey];
633            (
634                state.resolve_bus_name(&a.event_source_arn),
635                a.event_pattern.clone(),
636                a.state == "ENABLED",
637            )
638        };
639        if archive_bus != event_bus_name || !archive_enabled {
640            continue;
641        }
642        let pattern_matches = matches_pattern(
643            archive_pattern.as_deref(),
644            source,
645            detail_type,
646            detail,
647            account_id,
648            region,
649            resources,
650        );
651        if !pattern_matches {
652            continue;
653        }
654        if let Some(archive) = state.archives.get_mut(&akey) {
655            archive.event_count += 1;
656            archive.size_bytes += detail.len() as i64;
657            archive.events.push(event.clone());
658        }
659    }
660}
661
662/// Walk the named archive, filter events into the replay window, then
663/// fan out each event against rules on `bus_name` to collect its
664/// matching targets. Returns only events that matched at least one
665/// target.
666#[allow(clippy::too_many_arguments)]
667pub(crate) fn collect_replay_events_with_targets(
668    state: &crate::state::EventBridgeState,
669    archive_name: &str,
670    bus_name: &str,
671    event_start_time: DateTime<Utc>,
672    event_end_time: DateTime<Utc>,
673    account_id: &str,
674    region: &str,
675) -> Vec<(PutEvent, Vec<EventTarget>)> {
676    let Some(archive) = state.archives.get(archive_name) else {
677        return Vec::new();
678    };
679
680    let replay_events: Vec<PutEvent> = archive
681        .events
682        .iter()
683        .filter(|e| e.time >= event_start_time && e.time < event_end_time)
684        .cloned()
685        .collect();
686
687    let mut events_to_deliver: Vec<(PutEvent, Vec<EventTarget>)> = Vec::new();
688    for event in replay_events {
689        let matching_targets: Vec<EventTarget> = state
690            .rules
691            .values()
692            .filter(|r| {
693                r.event_bus_name == bus_name
694                    && r.state == "ENABLED"
695                    && matches_pattern(
696                        r.event_pattern.as_deref(),
697                        &event.source,
698                        &event.detail_type,
699                        &event.detail,
700                        account_id,
701                        region,
702                        &event.resources,
703                    )
704            })
705            .flat_map(|r| r.targets.clone())
706            .collect();
707
708        if !matching_targets.is_empty() {
709            events_to_deliver.push((event, matching_targets));
710        }
711    }
712    events_to_deliver
713}
714
715pub(crate) fn matches_numeric(numeric_arr: &Value, event_value: &Value) -> bool {
716    let arr = match numeric_arr.as_array() {
717        Some(a) => a,
718        None => return false,
719    };
720    let actual = match event_value.as_f64() {
721        Some(n) => n,
722        None => return false,
723    };
724    let mut i = 0;
725    while i + 1 < arr.len() {
726        let op = match arr[i].as_str() {
727            Some(s) => s,
728            None => return false,
729        };
730        let threshold = match arr[i + 1].as_f64() {
731            Some(n) => n,
732            None => return false,
733        };
734        let ok = match op {
735            ">" => actual > threshold,
736            ">=" => actual >= threshold,
737            "<" => actual < threshold,
738            "<=" => actual <= threshold,
739            "=" => (actual - threshold).abs() < f64::EPSILON,
740            _ => return false,
741        };
742        if !ok {
743            return false;
744        }
745        i += 2;
746    }
747    true
748}
749
750pub(crate) fn values_equal(a: &Value, b: &Value) -> bool {
751    a == b
752}
753
754/// Resolve a simple JSON path like `$.detail.name` against an event JSON value.
755pub(crate) fn resolve_json_path(event: &Value, path: &str) -> Option<Value> {
756    let path = path.strip_prefix('$').unwrap_or(path);
757    let mut current = event;
758    for segment in path.split('.') {
759        if segment.is_empty() {
760            continue;
761        }
762        current = current.get(segment)?;
763    }
764    Some(current.clone())
765}
766
767/// Apply an EventBridge InputTransformer to an event.
768pub(crate) fn apply_input_transformer(transformer: &Value, event: &Value) -> String {
769    let input_paths_map = transformer
770        .get("InputPathsMap")
771        .and_then(|v| v.as_object())
772        .cloned()
773        .unwrap_or_default();
774    let template = transformer
775        .get("InputTemplate")
776        .and_then(|v| v.as_str())
777        .unwrap_or("")
778        .to_string();
779
780    // Resolve all input paths
781    let mut resolved: HashMap<String, Value> = HashMap::new();
782    for (var_name, path_val) in &input_paths_map {
783        if let Some(path_str) = path_val.as_str() {
784            if let Some(val) = resolve_json_path(event, path_str) {
785                resolved.insert(var_name.clone(), val);
786            }
787        }
788    }
789
790    // Replace <varName> placeholders in template
791    let mut result = template;
792    for (var_name, val) in &resolved {
793        let placeholder = format!("<{var_name}>");
794        let replacement = match val {
795            Value::String(s) => s.clone(),
796            other => other.to_string(),
797        };
798        result = result.replace(&placeholder, &replacement);
799    }
800
801    result
802}
803
804pub(crate) fn missing(name: &str) -> AwsServiceError {
805    AwsServiceError::aws_error(
806        StatusCode::BAD_REQUEST,
807        "ValidationException",
808        format!("The request must contain the parameter {name}"),
809    )
810}
811
812/// Extract a Lambda function name from its ARN.
813///
814/// Handles both unqualified (`arn:aws:lambda:region:account:function:NAME`)
815/// and qualified (`arn:aws:lambda:region:account:function:NAME:alias`) ARNs.
816pub(crate) fn function_name_from_arn(arn: &str) -> &str {
817    let parts: Vec<&str> = arn.split(':').collect();
818    if parts.len() >= 7 && parts[5] == "function" {
819        parts[6]
820    } else {
821        arn
822    }
823}
824
825/// Spawn a background task to invoke a Lambda function via ContainerRuntime.
826/// This is fire-and-forget: EventBridge delivery is asynchronous.
827pub(crate) fn invoke_lambda_async(
828    container_runtime: &Option<Arc<ContainerRuntime>>,
829    lambda_state: &Option<SharedLambdaState>,
830    function_arn: &str,
831    payload: &str,
832) {
833    let runtime = match container_runtime {
834        Some(rt) => rt.clone(),
835        None => return,
836    };
837    let lambda_state = match lambda_state {
838        Some(ls) => ls.clone(),
839        None => return,
840    };
841    let func_name = function_name_from_arn(function_arn).to_string();
842    let payload = payload.as_bytes().to_vec();
843
844    tokio::spawn(async move {
845        let resolved = {
846            let accounts = lambda_state.read();
847            let state = accounts.default_ref();
848            state.functions.get(&func_name).cloned().map(|func| {
849                let mut layer_zips: Vec<Vec<u8>> = Vec::with_capacity(func.layers.len());
850                for attached in &func.layers {
851                    if let Some(bytes) = fakecloud_lambda::extras::parse_layer_version_arn(
852                        &attached.arn,
853                    )
854                    .and_then(|(acct, name, ver)| {
855                        accounts
856                            .get(&acct)
857                            .and_then(|s| s.layers.get(&name))
858                            .and_then(|l| l.versions.iter().find(|v| v.version == ver))
859                            .and_then(|v| v.code_zip.clone())
860                    }) {
861                        layer_zips.push(bytes);
862                    }
863                }
864                (func, layer_zips)
865            })
866        };
867        let (func, layer_zips) = match resolved {
868            Some(pair) => pair,
869            None => {
870                tracing::warn!(
871                    function = %func_name,
872                    "EventBridge Lambda target not found, skipping invocation"
873                );
874                return;
875            }
876        };
877        match runtime.invoke(&func, &payload, &layer_zips).await {
878            Ok(_) => {
879                tracing::info!(function = %func_name, "EventBridge Lambda invocation succeeded");
880            }
881            Err(e) => {
882                tracing::warn!(
883                    function = %func_name,
884                    error = %e,
885                    "EventBridge Lambda invocation failed"
886                );
887            }
888        }
889    });
890}
891
892/// Deliver an EventBridge event to CloudWatch Logs by writing a log event
893/// to the appropriate log group and stream.
894pub(crate) fn deliver_to_logs(
895    logs_state: &SharedLogsState,
896    log_group_arn: &str,
897    payload: &str,
898    timestamp: chrono::DateTime<chrono::Utc>,
899) {
900    // Extract log group name from ARN: arn:aws:logs:region:account:log-group:NAME
901    // or just the name if it's not an ARN
902    let group_name = if log_group_arn.contains(":log-group:") {
903        log_group_arn
904            .split(":log-group:")
905            .nth(1)
906            .unwrap_or(log_group_arn)
907            .trim_end_matches(":*")
908    } else {
909        log_group_arn
910    };
911
912    let stream_name = "events".to_string();
913    let ts_millis = timestamp.timestamp_millis();
914
915    let mut accounts = logs_state.write();
916    let state = accounts.default_mut();
917    let region = state.region.clone();
918    let account_id = state.account_id.clone();
919
920    // Auto-create log group and stream if they don't exist
921    let group = state
922        .log_groups
923        .entry(group_name.to_string())
924        .or_insert_with(|| fakecloud_logs::LogGroup {
925            name: group_name.to_string(),
926            arn: Arn::new(
927                "logs",
928                &region,
929                &account_id,
930                &format!("log-group:{group_name}"),
931            )
932            .to_string(),
933            creation_time: ts_millis,
934            retention_in_days: None,
935            kms_key_id: None,
936            tags: std::collections::BTreeMap::new(),
937            log_streams: std::collections::BTreeMap::new(),
938            stored_bytes: 0,
939            subscription_filters: Vec::new(),
940            data_protection_policy: None,
941            index_policies: Vec::new(),
942            transformer: None,
943            deletion_protection: false,
944            log_group_class: Some("STANDARD".to_string()),
945        });
946
947    let stream = group
948        .log_streams
949        .entry(stream_name.clone())
950        .or_insert_with(|| fakecloud_logs::LogStream {
951            name: stream_name,
952            arn: format!("{}:log-stream:events", group.arn),
953            creation_time: ts_millis,
954            first_event_timestamp: None,
955            last_event_timestamp: None,
956            last_ingestion_time: None,
957            upload_sequence_token: "1".to_string(),
958            events: Vec::new(),
959        });
960
961    stream.events.push(fakecloud_logs::LogEvent {
962        timestamp: ts_millis,
963        message: payload.to_string(),
964        ingestion_time: ts_millis,
965    });
966    stream.last_event_timestamp = Some(ts_millis);
967    stream.last_ingestion_time = Some(ts_millis);
968    if stream.first_event_timestamp.is_none() {
969        stream.first_event_timestamp = Some(ts_millis);
970    }
971}
972
973/// Apply connection auth parameters to an outgoing HTTP request.
974pub(crate) fn apply_connection_auth(
975    mut builder: reqwest::RequestBuilder,
976    conn: &Connection,
977) -> reqwest::RequestBuilder {
978    match conn.authorization_type.as_str() {
979        "API_KEY" => {
980            if let Some(params) = conn.auth_parameters.get("ApiKeyAuthParameters") {
981                if let (Some(name), Some(value)) = (
982                    params["ApiKeyName"].as_str(),
983                    params["ApiKeyValue"].as_str(),
984                ) {
985                    builder = builder.header(name, value);
986                }
987            }
988        }
989        "BASIC" => {
990            if let Some(params) = conn.auth_parameters.get("BasicAuthParameters") {
991                if let (Some(user), Some(pass)) =
992                    (params["Username"].as_str(), params["Password"].as_str())
993                {
994                    builder = builder.basic_auth(user, Some(pass));
995                }
996            }
997        }
998        "OAUTH_CLIENT_CREDENTIALS" => {
999            // For OAuth, in a real implementation we'd exchange credentials for a token.
1000            // Here we pass client credentials as basic auth as a reasonable approximation.
1001            if let Some(params) = conn.auth_parameters.get("OAuthParameters") {
1002                if let (Some(client_id), Some(client_secret)) = (
1003                    params["ClientParameters"]["ClientID"].as_str(),
1004                    params["ClientParameters"]["ClientSecret"].as_str(),
1005                ) {
1006                    builder = builder.basic_auth(client_id, Some(client_secret));
1007                }
1008            }
1009        }
1010        _ => {}
1011    }
1012    builder
1013}
1014
1015/// Context shared by both put_events (direct) and put_event_in_account
1016/// (cross-service) when dispatching matched targets. Optional state
1017/// handles let cross-service callers (which may not be wired with full
1018/// service plumbing) gracefully degrade — e.g. Lambda dispatch becomes
1019/// a fire-and-forget log unless `lambda_state` is wired.
1020pub(crate) struct EventDispatchContext<'a> {
1021    pub(crate) state: &'a crate::state::SharedEventBridgeState,
1022    pub(crate) delivery: &'a std::sync::Arc<fakecloud_core::delivery::DeliveryBus>,
1023    pub(crate) lambda_state: Option<&'a fakecloud_lambda::SharedLambdaState>,
1024    pub(crate) logs_state: Option<&'a fakecloud_logs::SharedLogsState>,
1025    pub(crate) container_runtime:
1026        &'a Option<std::sync::Arc<fakecloud_lambda::runtime::ContainerRuntime>>,
1027    pub(crate) account_id: &'a str,
1028    pub(crate) region: &'a str,
1029}
1030
1031/// Single-target dispatch shared by direct PutEvents and cross-service
1032/// put_event_in_account so both honour the same target shape (SQS/SNS/
1033/// Lambda/Logs/Kinesis/StepFunctions/ApiDestination/HTTP) and the same
1034/// InputTransformer + InputPath body resolution.
1035pub(crate) fn dispatch_event_target(
1036    ctx: &EventDispatchContext,
1037    target: &crate::state::EventTarget,
1038    event_json: &Value,
1039    event_id: &str,
1040    detail_type: &str,
1041) {
1042    let arn = &target.arn;
1043    let event_str = event_json.to_string();
1044    let body_str = if let Some(ref transformer) = target.input_transformer {
1045        apply_input_transformer(transformer, event_json)
1046    } else if let Some(ref input) = target.input {
1047        input.clone()
1048    } else if let Some(ref input_path) = target.input_path {
1049        resolve_json_path(event_json, input_path)
1050            .map(|v| v.to_string())
1051            .unwrap_or_else(|| event_str.clone())
1052    } else {
1053        event_str.clone()
1054    };
1055
1056    if arn.contains(":sqs:") {
1057        let group_id = target
1058            .sqs_parameters
1059            .as_ref()
1060            .and_then(|p| p["MessageGroupId"].as_str())
1061            .map(|s| s.to_string());
1062        if group_id.is_some() {
1063            ctx.delivery.send_to_sqs_with_attrs(
1064                arn,
1065                &body_str,
1066                &HashMap::new(),
1067                group_id.as_deref(),
1068                None,
1069            );
1070        } else {
1071            ctx.delivery.send_to_sqs(arn, &body_str, &HashMap::new());
1072        }
1073    } else if arn.contains(":sns:") {
1074        ctx.delivery
1075            .publish_to_sns(arn, &body_str, Some(detail_type));
1076    } else if arn.contains(":lambda:") {
1077        tracing::info!(
1078            function_arn = %arn,
1079            payload = %body_str,
1080            "EventBridge delivering to Lambda function"
1081        );
1082        let now = chrono::Utc::now();
1083        {
1084            let mut accounts = ctx.state.write();
1085            let s = accounts.get_or_create(ctx.account_id);
1086            s.lambda_invocations.push(crate::state::LambdaInvocation {
1087                function_arn: arn.clone(),
1088                payload: body_str.clone(),
1089                timestamp: now,
1090            });
1091        }
1092        if let Some(ls) = ctx.lambda_state {
1093            ls.write()
1094                .default_mut()
1095                .invocations
1096                .push(fakecloud_lambda::LambdaInvocation {
1097                    function_arn: arn.clone(),
1098                    payload: body_str.clone(),
1099                    timestamp: now,
1100                    source: "aws:events".to_string(),
1101                });
1102        }
1103        invoke_lambda_async(
1104            ctx.container_runtime,
1105            &ctx.lambda_state.cloned(),
1106            arn,
1107            &body_str,
1108        );
1109    } else if arn.contains(":logs:") {
1110        tracing::info!(
1111            log_group_arn = %arn,
1112            payload = %body_str,
1113            "EventBridge delivering to CloudWatch Logs"
1114        );
1115        let now = chrono::Utc::now();
1116        {
1117            let mut accounts = ctx.state.write();
1118            let s = accounts.get_or_create(ctx.account_id);
1119            s.log_deliveries.push(crate::state::LogDelivery {
1120                log_group_arn: arn.clone(),
1121                payload: body_str.clone(),
1122                timestamp: now,
1123            });
1124        }
1125        if let Some(log_state) = ctx.logs_state {
1126            deliver_to_logs(log_state, arn, &body_str, now);
1127        }
1128    } else if arn.contains(":kinesis:") {
1129        tracing::info!(
1130            stream_arn = %arn,
1131            "EventBridge delivering to Kinesis stream"
1132        );
1133        ctx.delivery.send_to_kinesis(arn, &body_str, event_id);
1134    } else if arn.contains(":states:") {
1135        tracing::info!(
1136            state_machine_arn = %arn,
1137            "EventBridge delivering to Step Functions"
1138        );
1139        ctx.delivery.start_stepfunctions_execution(arn, &body_str);
1140        let mut accounts = ctx.state.write();
1141        let s = accounts.get_or_create(ctx.account_id);
1142        s.step_function_executions
1143            .push(crate::state::StepFunctionExecution {
1144                state_machine_arn: arn.clone(),
1145                payload: body_str.clone(),
1146                timestamp: chrono::Utc::now(),
1147            });
1148    } else if arn.contains(":api-destination/") {
1149        let accounts = ctx.state.read();
1150        let empty = crate::state::EventBridgeState::new(ctx.account_id, ctx.region);
1151        let s = accounts.get(ctx.account_id).unwrap_or(&empty);
1152        let dest = s.api_destinations.values().find(|d| d.arn == *arn).cloned();
1153        let conn = dest.as_ref().and_then(|d| {
1154            s.connections
1155                .values()
1156                .find(|c| c.arn == d.connection_arn)
1157                .cloned()
1158        });
1159        drop(accounts);
1160        if let Some(dest) = dest {
1161            let url = dest.invocation_endpoint;
1162            let method = dest.http_method;
1163            let payload = body_str.clone();
1164            tokio::spawn(async move {
1165                let client = reqwest::Client::new();
1166                let mut req_builder = match method.as_str() {
1167                    "GET" => client.get(&url),
1168                    "PUT" => client.put(&url),
1169                    "DELETE" => client.delete(&url),
1170                    "PATCH" => client.patch(&url),
1171                    "HEAD" => client.head(&url),
1172                    _ => client.post(&url),
1173                };
1174                req_builder = req_builder.header("Content-Type", "application/json");
1175                if let Some(conn) = conn {
1176                    req_builder = apply_connection_auth(req_builder, &conn);
1177                }
1178                let result = req_builder.body(payload).send().await;
1179                if let Err(e) = result {
1180                    tracing::warn!(
1181                        endpoint = %url,
1182                        error = %e,
1183                        "EventBridge ApiDestination delivery failed"
1184                    );
1185                }
1186            });
1187        }
1188    } else if arn.starts_with("https://") || arn.starts_with("http://") {
1189        let url = arn.clone();
1190        let payload = body_str.clone();
1191        tokio::spawn(async move {
1192            let client = reqwest::Client::new();
1193            let result = client
1194                .post(&url)
1195                .header("Content-Type", "application/json")
1196                .body(payload)
1197                .send()
1198                .await;
1199            if let Err(e) = result {
1200                tracing::warn!(
1201                    endpoint = %url,
1202                    error = %e,
1203                    "EventBridge HTTP target delivery failed"
1204                );
1205            }
1206        });
1207    }
1208}