Skip to main content

fakecloud_eventbridge/
helpers.rs

1use super::*;
2
3/// Validate a single `PutEvents` entry's required fields (`Source`,
4/// `DetailType`, `Detail`) and that `Detail` is a well-formed JSON
5/// object. Returns the JSON error body AWS surfaces in the matching
6/// `Entries[]` slot on failure.
7pub(crate) fn validate_put_events_entry(
8    source: &str,
9    detail_type: &str,
10    detail: &str,
11) -> Result<(), Value> {
12    if source.is_empty() {
13        return Err(json!({
14            "ErrorCode": "InvalidArgument",
15            "ErrorMessage": "Parameter Source is not valid. Reason: Source is a required argument.",
16        }));
17    }
18    if detail_type.is_empty() {
19        return Err(json!({
20            "ErrorCode": "InvalidArgument",
21            "ErrorMessage": "Parameter DetailType is not valid. Reason: DetailType is a required argument.",
22        }));
23    }
24    if detail.is_empty() {
25        return Err(json!({
26            "ErrorCode": "InvalidArgument",
27            "ErrorMessage": "Parameter Detail is not valid. Reason: Detail is a required argument.",
28        }));
29    }
30    if serde_json::from_str::<Value>(detail).is_err() {
31        return Err(json!({
32            "ErrorCode": "MalformedDetail",
33            "ErrorMessage": "Detail is malformed.",
34        }));
35    }
36    Ok(())
37}
38
39/// Parse an entry's `Time` field, tolerating the three formats AWS
40/// accepts (RFC 3339 string, fractional seconds as a float, integer
41/// seconds). Falls back to "now" if the field is absent or
42/// unparseable, which matches the real service.
43pub(crate) fn parse_put_events_time(raw: &Value) -> DateTime<Utc> {
44    if let Some(s) = raw.as_str() {
45        return DateTime::parse_from_rfc3339(s)
46            .map(|dt| dt.with_timezone(&Utc))
47            .unwrap_or_else(|_| Utc::now());
48    }
49    if let Some(ts) = raw.as_f64() {
50        return DateTime::from_timestamp(ts as i64, ((ts.fract()) * 1_000_000_000.0) as u32)
51            .unwrap_or_else(Utc::now);
52    }
53    if let Some(ts) = raw.as_i64() {
54        return DateTime::from_timestamp(ts, 0).unwrap_or_else(Utc::now);
55    }
56    Utc::now()
57}
58
59/// Actions that mutate EventBridge state.
60pub(crate) fn is_mutating_action(action: &str) -> bool {
61    matches!(
62        action,
63        "CreateEventBus"
64            | "DeleteEventBus"
65            | "UpdateEventBus"
66            | "PutRule"
67            | "DeleteRule"
68            | "EnableRule"
69            | "DisableRule"
70            | "PutTargets"
71            | "RemoveTargets"
72            | "PutEvents"
73            | "PutPermission"
74            | "RemovePermission"
75            | "TagResource"
76            | "UntagResource"
77            | "CreateArchive"
78            | "UpdateArchive"
79            | "DeleteArchive"
80            | "CreateConnection"
81            | "UpdateConnection"
82            | "DeleteConnection"
83            | "DeauthorizeConnection"
84            | "CreateApiDestination"
85            | "UpdateApiDestination"
86            | "DeleteApiDestination"
87            | "StartReplay"
88            | "CancelReplay"
89            | "CreatePartnerEventSource"
90            | "DeletePartnerEventSource"
91            | "ActivateEventSource"
92            | "DeactivateEventSource"
93            | "PutPartnerEvents"
94            | "CreateEndpoint"
95            | "DeleteEndpoint"
96            | "UpdateEndpoint"
97    )
98}
99
100pub(crate) fn parse_tags(body: &Value) -> BTreeMap<String, String> {
101    let mut tags = BTreeMap::new();
102    if let Some(arr) = body["Tags"].as_array() {
103        for tag in arr {
104            if let (Some(key), Some(val)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
105                tags.insert(key.to_string(), val.to_string());
106            }
107        }
108    }
109    tags
110}
111
112pub fn parse_target(target: &Value) -> EventTarget {
113    EventTarget {
114        id: target["Id"].as_str().unwrap_or("").to_string(),
115        arn: target["Arn"].as_str().unwrap_or("").to_string(),
116        input: target["Input"].as_str().map(|s| s.to_string()),
117        input_path: target["InputPath"].as_str().map(|s| s.to_string()),
118        input_transformer: target.get("InputTransformer").cloned(),
119        sqs_parameters: target.get("SqsParameters").cloned(),
120        role_arn: target["RoleArn"].as_str().map(|s| s.to_string()),
121        dead_letter_config: target.get("DeadLetterConfig").cloned(),
122        retry_policy: target.get("RetryPolicy").cloned(),
123        ecs_parameters: target.get("EcsParameters").cloned(),
124        batch_parameters: target.get("BatchParameters").cloned(),
125        kinesis_parameters: target.get("KinesisParameters").cloned(),
126        redshift_data_parameters: target.get("RedshiftDataParameters").cloned(),
127        http_parameters: target.get("HttpParameters").cloned(),
128        sage_maker_pipeline_parameters: target.get("SageMakerPipelineParameters").cloned(),
129        app_sync_parameters: target.get("AppSyncParameters").cloned(),
130        run_command_parameters: target.get("RunCommandParameters").cloned(),
131    }
132}
133
134pub(crate) fn target_to_json(t: &EventTarget) -> Value {
135    let mut obj = json!({ "Id": t.id, "Arn": t.arn });
136    if let Some(ref input) = t.input {
137        obj["Input"] = json!(input);
138    }
139    if let Some(ref input_path) = t.input_path {
140        obj["InputPath"] = json!(input_path);
141    }
142    if let Some(ref it) = t.input_transformer {
143        obj["InputTransformer"] = it.clone();
144    }
145    if let Some(ref sp) = t.sqs_parameters {
146        obj["SqsParameters"] = sp.clone();
147    }
148    if let Some(ref ra) = t.role_arn {
149        obj["RoleArn"] = json!(ra);
150    }
151    if let Some(ref dlc) = t.dead_letter_config {
152        obj["DeadLetterConfig"] = dlc.clone();
153    }
154    if let Some(ref rp) = t.retry_policy {
155        obj["RetryPolicy"] = rp.clone();
156    }
157    if let Some(ref p) = t.ecs_parameters {
158        obj["EcsParameters"] = p.clone();
159    }
160    if let Some(ref p) = t.batch_parameters {
161        obj["BatchParameters"] = p.clone();
162    }
163    if let Some(ref p) = t.kinesis_parameters {
164        obj["KinesisParameters"] = p.clone();
165    }
166    if let Some(ref p) = t.redshift_data_parameters {
167        obj["RedshiftDataParameters"] = p.clone();
168    }
169    if let Some(ref p) = t.http_parameters {
170        obj["HttpParameters"] = p.clone();
171    }
172    if let Some(ref p) = t.sage_maker_pipeline_parameters {
173        obj["SageMakerPipelineParameters"] = p.clone();
174    }
175    if let Some(ref p) = t.app_sync_parameters {
176        obj["AppSyncParameters"] = p.clone();
177    }
178    if let Some(ref p) = t.run_command_parameters {
179        obj["RunCommandParameters"] = p.clone();
180    }
181    obj
182}
183
184pub(crate) fn find_tags_mut<'a>(
185    state: &'a mut crate::state::EventBridgeState,
186    arn: &str,
187) -> Result<&'a mut BTreeMap<String, String>, AwsServiceError> {
188    // Check buses
189    for bus in state.buses.values_mut() {
190        if bus.arn == arn {
191            return Ok(&mut bus.tags);
192        }
193    }
194    // Check rules
195    for rule in state.rules.values_mut() {
196        if rule.arn == arn {
197            return Ok(&mut rule.tags);
198        }
199    }
200
201    // Parse ARN to give better error messages
202    let error_msg = if arn.contains(":rule/") {
203        // Extract rule name and bus from ARN
204        let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
205        if let Some(rule_path) = parts.first() {
206            if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
207                format!("Rule {rule_name} does not exist on EventBus {bus}.")
208            } else {
209                format!("Rule {} does not exist on EventBus default.", rule_path)
210            }
211        } else {
212            format!("Resource {arn} not found.")
213        }
214    } else {
215        format!("Resource {arn} not found.")
216    };
217
218    Err(AwsServiceError::aws_error(
219        StatusCode::BAD_REQUEST,
220        "ResourceNotFoundException",
221        error_msg,
222    ))
223}
224
225pub(crate) fn find_tags<'a>(
226    state: &'a crate::state::EventBridgeState,
227    arn: &str,
228) -> Result<&'a BTreeMap<String, String>, AwsServiceError> {
229    for bus in state.buses.values() {
230        if bus.arn == arn {
231            return Ok(&bus.tags);
232        }
233    }
234    for rule in state.rules.values() {
235        if rule.arn == arn {
236            return Ok(&rule.tags);
237        }
238    }
239
240    let error_msg = if arn.contains(":rule/") {
241        let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
242        if let Some(rule_path) = parts.first() {
243            if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
244                format!("Rule {rule_name} does not exist on EventBus {bus}.")
245            } else {
246                format!("Rule {} does not exist on EventBus default.", rule_path)
247            }
248        } else {
249            format!("Resource {arn} not found.")
250        }
251    } else {
252        format!("Resource {arn} not found.")
253    };
254
255    Err(AwsServiceError::aws_error(
256        StatusCode::BAD_REQUEST,
257        "ResourceNotFoundException",
258        error_msg,
259    ))
260}
261
262pub(crate) fn validate_event_pattern(pattern: &str) -> Result<(), AwsServiceError> {
263    let parsed: Value = serde_json::from_str(pattern).map_err(|_| {
264        AwsServiceError::aws_error(
265            StatusCode::BAD_REQUEST,
266            "InvalidEventPatternException",
267            "Event pattern is not valid. Reason: Invalid JSON",
268        )
269    })?;
270
271    validate_pattern_values(&parsed, "")?;
272    Ok(())
273}
274
275pub(crate) fn validate_pattern_values(value: &Value, path: &str) -> Result<(), AwsServiceError> {
276    match value {
277        Value::Object(obj) => {
278            for (key, val) in obj {
279                let new_path = if path.is_empty() {
280                    key.clone()
281                } else {
282                    format!("{path}.{key}")
283                };
284                match val {
285                    Value::Object(_) => validate_pattern_values(val, &new_path)?,
286                    Value::Array(_) => {} // Arrays are fine at leaf level
287                    _ => {
288                        return Err(AwsServiceError::aws_error(
289                            StatusCode::BAD_REQUEST,
290                            "InvalidEventPatternException",
291                            format!(
292                                "Event pattern is not valid. Reason: '{}' must be an object or an array",
293                                key
294                            ),
295                        ));
296                    }
297                }
298            }
299            Ok(())
300        }
301        _ => Ok(()),
302    }
303}
304
305pub(crate) fn build_auth_params_response(auth_type: &str, params: &Value) -> Value {
306    match auth_type {
307        "API_KEY" => {
308            let mut resp = json!({});
309            if let Some(api_key) = params.get("ApiKeyAuthParameters") {
310                resp["ApiKeyAuthParameters"] = json!({
311                    "ApiKeyName": api_key["ApiKeyName"],
312                });
313            }
314            resp
315        }
316        "BASIC" => {
317            let mut resp = json!({});
318            if let Some(basic) = params.get("BasicAuthParameters") {
319                resp["BasicAuthParameters"] = json!({
320                    "Username": basic["Username"],
321                });
322            }
323            resp
324        }
325        "OAUTH_CLIENT_CREDENTIALS" => {
326            let mut resp = json!({});
327            if let Some(oauth) = params.get("OAuthParameters") {
328                resp["OAuthParameters"] = json!({
329                    "AuthorizationEndpoint": oauth["AuthorizationEndpoint"],
330                    "HttpMethod": oauth["HttpMethod"],
331                    "ClientParameters": {
332                        "ClientID": oauth.get("ClientParameters").and_then(|c| c.get("ClientID")),
333                    },
334                });
335            }
336            resp
337        }
338        _ => params.clone(),
339    }
340}
341
342/// Match an event against an EventBridge event pattern.
343pub(crate) fn matches_pattern(
344    pattern_json: Option<&str>,
345    source: &str,
346    detail_type: &str,
347    detail: &str,
348    account: &str,
349    region: &str,
350    resources: &[String],
351) -> bool {
352    let pattern_json = match pattern_json {
353        Some(p) => p,
354        None => return true,
355    };
356
357    let pattern: Value = match serde_json::from_str(pattern_json) {
358        Ok(v) => v,
359        Err(_) => return false,
360    };
361
362    if !pattern.is_object() {
363        return false;
364    }
365
366    let detail_value: Value = serde_json::from_str(detail).unwrap_or(json!({}));
367    let event = json!({
368        "source": source,
369        "detail-type": detail_type,
370        "detail": detail_value,
371        "account": account,
372        "region": region,
373        "resources": resources,
374    });
375
376    matches_value(&pattern, &event)
377}
378
379pub(crate) fn matches_value(pattern: &Value, event_value: &Value) -> bool {
380    match pattern {
381        Value::Object(obj) => {
382            // `$or` is a sibling-level alternation: any alternative pattern
383            // matched against this same event level passes the whole object.
384            if let Some(Value::Array(alternatives)) = obj.get("$or") {
385                return alternatives
386                    .iter()
387                    .any(|alt| matches_value(alt, event_value));
388            }
389            for (key, sub_pattern) in obj {
390                if key == "$or" {
391                    continue;
392                }
393                let sub_value = &event_value[key];
394                if !matches_value(sub_pattern, sub_value) {
395                    return false;
396                }
397            }
398            true
399        }
400        Value::Array(arr) => arr.iter().any(|elem| matches_single(elem, event_value)),
401        _ => false,
402    }
403}
404
405pub(crate) fn matches_single(pattern_elem: &Value, event_value: &Value) -> bool {
406    match pattern_elem {
407        Value::Object(obj) => {
408            if let Some(prefix_val) = obj.get("prefix") {
409                if let (Some(prefix), Some(actual)) = (prefix_val.as_str(), event_value.as_str()) {
410                    return actual.starts_with(prefix);
411                }
412                return false;
413            }
414            if let Some(suffix_val) = obj.get("suffix") {
415                if let (Some(suffix), Some(actual)) = (suffix_val.as_str(), event_value.as_str()) {
416                    return actual.ends_with(suffix);
417                }
418                return false;
419            }
420            if let Some(eqic_val) = obj.get("equals-ignore-case") {
421                if let (Some(expected), Some(actual)) = (eqic_val.as_str(), event_value.as_str()) {
422                    return expected.eq_ignore_ascii_case(actual);
423                }
424                return false;
425            }
426            if let Some(cidr_val) = obj.get("cidr") {
427                if let (Some(cidr), Some(actual)) = (cidr_val.as_str(), event_value.as_str()) {
428                    return cidr_matches(cidr, actual);
429                }
430                return false;
431            }
432            if let Some(wild_val) = obj.get("wildcard") {
433                if let (Some(pattern), Some(actual)) = (wild_val.as_str(), event_value.as_str()) {
434                    return wildcard_matches(pattern, actual);
435                }
436                return false;
437            }
438            if let Some(exists_val) = obj.get("exists") {
439                let should_exist = exists_val.as_bool().unwrap_or(true);
440                let does_exist = !event_value.is_null();
441                return should_exist == does_exist;
442            }
443            if let Some(anything_but_val) = obj.get("anything-but") {
444                return match anything_but_val {
445                    Value::String(s) => event_value.as_str() != Some(s.as_str()),
446                    Value::Array(arr) => !arr.iter().any(|v| values_equal(v, event_value)),
447                    Value::Number(_) => event_value != anything_but_val,
448                    // Nested-matcher negation, e.g.
449                    // {"anything-but": {"prefix": "x"}} matches when the
450                    // field does NOT start with "x". Previously the object
451                    // form always matched, so excluded events were still
452                    // delivered (bug-audit 2026-05-28, 1.14).
453                    Value::Object(nested) => {
454                        let fv = event_value.as_str();
455                        if let Some(p) = nested.get("prefix").and_then(|v| v.as_str()) {
456                            !fv.is_some_and(|s| s.starts_with(p))
457                        } else if let Some(suf) = nested.get("suffix").and_then(|v| v.as_str()) {
458                            !fv.is_some_and(|s| s.ends_with(suf))
459                        } else if let Some(eic) =
460                            nested.get("equals-ignore-case").and_then(|v| v.as_str())
461                        {
462                            !fv.is_some_and(|s| s.eq_ignore_ascii_case(eic))
463                        } else {
464                            // Unknown nested matcher: default to matching.
465                            true
466                        }
467                    }
468                    _ => true,
469                };
470            }
471            if let Some(numeric_val) = obj.get("numeric") {
472                return matches_numeric(numeric_val, event_value);
473            }
474            false
475        }
476        _ => values_equal(pattern_elem, event_value),
477    }
478}
479
480/// `wildcard` matcher: `*` matches any run of characters (including empty);
481/// `\*` is a literal asterisk.
482pub(crate) fn wildcard_matches(pattern: &str, actual: &str) -> bool {
483    let mut segments: Vec<String> = Vec::new();
484    let mut current = String::new();
485    let mut chars = pattern.chars();
486    while let Some(c) = chars.next() {
487        if c == '\\' {
488            if let Some(next) = chars.next() {
489                current.push(next);
490            }
491        } else if c == '*' {
492            segments.push(std::mem::take(&mut current));
493        } else {
494            current.push(c);
495        }
496    }
497    segments.push(current);
498
499    if segments.len() == 1 {
500        return segments[0] == actual;
501    }
502
503    let mut pos = 0;
504    let first = &segments[0];
505    if !actual[pos..].starts_with(first.as_str()) {
506        return false;
507    }
508    pos += first.len();
509
510    let last_idx = segments.len() - 1;
511    for (i, seg) in segments.iter().enumerate().skip(1) {
512        if i == last_idx {
513            // The trailing segment must match the tail.
514            if !actual[pos..].ends_with(seg.as_str()) {
515                return false;
516            }
517            return actual.len().saturating_sub(pos) >= seg.len();
518        }
519        match actual[pos..].find(seg.as_str()) {
520            Some(idx) => pos += idx + seg.len(),
521            None => return false,
522        }
523    }
524    true
525}
526
527/// IPv4 CIDR membership test for the `cidr` filter.
528pub(crate) fn cidr_matches(cidr: &str, actual: &str) -> bool {
529    let (net_str, prefix_str) = match cidr.split_once('/') {
530        Some(parts) => parts,
531        None => return false,
532    };
533    let prefix: u32 = match prefix_str.parse() {
534        Ok(p) if p <= 32 => p,
535        _ => return false,
536    };
537    let net = match parse_ipv4(net_str) {
538        Some(n) => n,
539        None => return false,
540    };
541    let value = match parse_ipv4(actual) {
542        Some(v) => v,
543        None => return false,
544    };
545    if prefix == 0 {
546        return true;
547    }
548    let mask = u32::MAX << (32 - prefix);
549    (net & mask) == (value & mask)
550}
551
552fn parse_ipv4(s: &str) -> Option<u32> {
553    let mut parts = s.split('.');
554    let mut result: u32 = 0;
555    for _ in 0..4 {
556        let octet: u32 = parts.next()?.parse().ok()?;
557        if octet > 255 {
558            return None;
559        }
560        result = (result << 8) | octet;
561    }
562    if parts.next().is_some() {
563        return None;
564    }
565    Some(result)
566}
567
568/// For each archive on `event_bus_name` whose event pattern matches the
569/// event, append a clone of it to the archive's stored events and bump
570/// the archive's counters.
571#[allow(clippy::too_many_arguments)]
572pub(crate) fn archive_matching_event(
573    state: &mut crate::state::EventBridgeState,
574    event: &PutEvent,
575    event_bus_name: &str,
576    source: &str,
577    detail_type: &str,
578    detail: &str,
579    account_id: &str,
580    region: &str,
581    resources: &[String],
582) {
583    let archive_keys: Vec<String> = state.archives.keys().cloned().collect();
584    for akey in archive_keys {
585        let (archive_bus, archive_pattern, archive_enabled) = {
586            let a = &state.archives[&akey];
587            (
588                state.resolve_bus_name(&a.event_source_arn),
589                a.event_pattern.clone(),
590                a.state == "ENABLED",
591            )
592        };
593        if archive_bus != event_bus_name || !archive_enabled {
594            continue;
595        }
596        let pattern_matches = matches_pattern(
597            archive_pattern.as_deref(),
598            source,
599            detail_type,
600            detail,
601            account_id,
602            region,
603            resources,
604        );
605        if !pattern_matches {
606            continue;
607        }
608        if let Some(archive) = state.archives.get_mut(&akey) {
609            archive.event_count += 1;
610            archive.size_bytes += detail.len() as i64;
611            archive.events.push(event.clone());
612        }
613    }
614}
615
616/// Walk the named archive, filter events into the replay window, then
617/// fan out each event against rules on `bus_name` to collect its
618/// matching targets. Returns only events that matched at least one
619/// target.
620#[allow(clippy::too_many_arguments)]
621pub(crate) fn collect_replay_events_with_targets(
622    state: &crate::state::EventBridgeState,
623    archive_name: &str,
624    bus_name: &str,
625    event_start_time: DateTime<Utc>,
626    event_end_time: DateTime<Utc>,
627    account_id: &str,
628    region: &str,
629) -> Vec<(PutEvent, Vec<EventTarget>)> {
630    let Some(archive) = state.archives.get(archive_name) else {
631        return Vec::new();
632    };
633
634    let replay_events: Vec<PutEvent> = archive
635        .events
636        .iter()
637        .filter(|e| e.time >= event_start_time && e.time < event_end_time)
638        .cloned()
639        .collect();
640
641    let mut events_to_deliver: Vec<(PutEvent, Vec<EventTarget>)> = Vec::new();
642    for event in replay_events {
643        let matching_targets: Vec<EventTarget> = state
644            .rules
645            .values()
646            .filter(|r| {
647                r.event_bus_name == bus_name
648                    && r.state == "ENABLED"
649                    && matches_pattern(
650                        r.event_pattern.as_deref(),
651                        &event.source,
652                        &event.detail_type,
653                        &event.detail,
654                        account_id,
655                        region,
656                        &event.resources,
657                    )
658            })
659            .flat_map(|r| r.targets.clone())
660            .collect();
661
662        if !matching_targets.is_empty() {
663            events_to_deliver.push((event, matching_targets));
664        }
665    }
666    events_to_deliver
667}
668
669pub(crate) fn matches_numeric(numeric_arr: &Value, event_value: &Value) -> bool {
670    let arr = match numeric_arr.as_array() {
671        Some(a) => a,
672        None => return false,
673    };
674    let actual = match event_value.as_f64() {
675        Some(n) => n,
676        None => return false,
677    };
678    let mut i = 0;
679    while i + 1 < arr.len() {
680        let op = match arr[i].as_str() {
681            Some(s) => s,
682            None => return false,
683        };
684        let threshold = match arr[i + 1].as_f64() {
685            Some(n) => n,
686            None => return false,
687        };
688        let ok = match op {
689            ">" => actual > threshold,
690            ">=" => actual >= threshold,
691            "<" => actual < threshold,
692            "<=" => actual <= threshold,
693            "=" => (actual - threshold).abs() < f64::EPSILON,
694            _ => return false,
695        };
696        if !ok {
697            return false;
698        }
699        i += 2;
700    }
701    true
702}
703
704pub(crate) fn values_equal(a: &Value, b: &Value) -> bool {
705    a == b
706}
707
708/// Resolve a simple JSON path like `$.detail.name` against an event JSON value.
709pub(crate) fn resolve_json_path(event: &Value, path: &str) -> Option<Value> {
710    let path = path.strip_prefix('$').unwrap_or(path);
711    let mut current = event;
712    for segment in path.split('.') {
713        if segment.is_empty() {
714            continue;
715        }
716        current = current.get(segment)?;
717    }
718    Some(current.clone())
719}
720
721/// Apply an EventBridge InputTransformer to an event.
722pub(crate) fn apply_input_transformer(transformer: &Value, event: &Value) -> String {
723    let input_paths_map = transformer
724        .get("InputPathsMap")
725        .and_then(|v| v.as_object())
726        .cloned()
727        .unwrap_or_default();
728    let template = transformer
729        .get("InputTemplate")
730        .and_then(|v| v.as_str())
731        .unwrap_or("")
732        .to_string();
733
734    // Resolve all input paths
735    let mut resolved: HashMap<String, Value> = HashMap::new();
736    for (var_name, path_val) in &input_paths_map {
737        if let Some(path_str) = path_val.as_str() {
738            if let Some(val) = resolve_json_path(event, path_str) {
739                resolved.insert(var_name.clone(), val);
740            }
741        }
742    }
743
744    // Replace <varName> placeholders in template
745    let mut result = template;
746    for (var_name, val) in &resolved {
747        let placeholder = format!("<{var_name}>");
748        let replacement = match val {
749            Value::String(s) => s.clone(),
750            other => other.to_string(),
751        };
752        result = result.replace(&placeholder, &replacement);
753    }
754
755    result
756}
757
758pub(crate) fn missing(name: &str) -> AwsServiceError {
759    AwsServiceError::aws_error(
760        StatusCode::BAD_REQUEST,
761        "ValidationException",
762        format!("The request must contain the parameter {name}"),
763    )
764}
765
766/// Extract a Lambda function name from its ARN.
767///
768/// Handles both unqualified (`arn:aws:lambda:region:account:function:NAME`)
769/// and qualified (`arn:aws:lambda:region:account:function:NAME:alias`) ARNs.
770pub(crate) fn function_name_from_arn(arn: &str) -> &str {
771    let parts: Vec<&str> = arn.split(':').collect();
772    if parts.len() >= 7 && parts[5] == "function" {
773        parts[6]
774    } else {
775        arn
776    }
777}
778
779/// Spawn a background task to invoke a Lambda function via ContainerRuntime.
780/// This is fire-and-forget: EventBridge delivery is asynchronous.
781pub(crate) fn invoke_lambda_async(
782    container_runtime: &Option<Arc<ContainerRuntime>>,
783    lambda_state: &Option<SharedLambdaState>,
784    function_arn: &str,
785    payload: &str,
786) {
787    let runtime = match container_runtime {
788        Some(rt) => rt.clone(),
789        None => return,
790    };
791    let lambda_state = match lambda_state {
792        Some(ls) => ls.clone(),
793        None => return,
794    };
795    let func_name = function_name_from_arn(function_arn).to_string();
796    let payload = payload.as_bytes().to_vec();
797
798    tokio::spawn(async move {
799        let resolved = {
800            let accounts = lambda_state.read();
801            let state = accounts.default_ref();
802            state.functions.get(&func_name).cloned().map(|func| {
803                let mut layer_zips: Vec<Vec<u8>> = Vec::with_capacity(func.layers.len());
804                for attached in &func.layers {
805                    if let Some(bytes) = fakecloud_lambda::extras::parse_layer_version_arn(
806                        &attached.arn,
807                    )
808                    .and_then(|(acct, name, ver)| {
809                        accounts
810                            .get(&acct)
811                            .and_then(|s| s.layers.get(&name))
812                            .and_then(|l| l.versions.iter().find(|v| v.version == ver))
813                            .and_then(|v| v.code_zip.clone())
814                    }) {
815                        layer_zips.push(bytes);
816                    }
817                }
818                (func, layer_zips)
819            })
820        };
821        let (func, layer_zips) = match resolved {
822            Some(pair) => pair,
823            None => {
824                tracing::warn!(
825                    function = %func_name,
826                    "EventBridge Lambda target not found, skipping invocation"
827                );
828                return;
829            }
830        };
831        match runtime.invoke(&func, &payload, &layer_zips).await {
832            Ok(_) => {
833                tracing::info!(function = %func_name, "EventBridge Lambda invocation succeeded");
834            }
835            Err(e) => {
836                tracing::warn!(
837                    function = %func_name,
838                    error = %e,
839                    "EventBridge Lambda invocation failed"
840                );
841            }
842        }
843    });
844}
845
846/// Deliver an EventBridge event to CloudWatch Logs by writing a log event
847/// to the appropriate log group and stream.
848pub(crate) fn deliver_to_logs(
849    logs_state: &SharedLogsState,
850    log_group_arn: &str,
851    payload: &str,
852    timestamp: chrono::DateTime<chrono::Utc>,
853) {
854    // Extract log group name from ARN: arn:aws:logs:region:account:log-group:NAME
855    // or just the name if it's not an ARN
856    let group_name = if log_group_arn.contains(":log-group:") {
857        log_group_arn
858            .split(":log-group:")
859            .nth(1)
860            .unwrap_or(log_group_arn)
861            .trim_end_matches(":*")
862    } else {
863        log_group_arn
864    };
865
866    let stream_name = "events".to_string();
867    let ts_millis = timestamp.timestamp_millis();
868
869    let mut accounts = logs_state.write();
870    let state = accounts.default_mut();
871    let region = state.region.clone();
872    let account_id = state.account_id.clone();
873
874    // Auto-create log group and stream if they don't exist
875    let group = state
876        .log_groups
877        .entry(group_name.to_string())
878        .or_insert_with(|| fakecloud_logs::LogGroup {
879            name: group_name.to_string(),
880            arn: Arn::new(
881                "logs",
882                &region,
883                &account_id,
884                &format!("log-group:{group_name}"),
885            )
886            .to_string(),
887            creation_time: ts_millis,
888            retention_in_days: None,
889            kms_key_id: None,
890            tags: std::collections::BTreeMap::new(),
891            log_streams: std::collections::BTreeMap::new(),
892            stored_bytes: 0,
893            subscription_filters: Vec::new(),
894            data_protection_policy: None,
895            index_policies: Vec::new(),
896            transformer: None,
897            deletion_protection: false,
898            log_group_class: Some("STANDARD".to_string()),
899        });
900
901    let stream = group
902        .log_streams
903        .entry(stream_name.clone())
904        .or_insert_with(|| fakecloud_logs::LogStream {
905            name: stream_name,
906            arn: format!("{}:log-stream:events", group.arn),
907            creation_time: ts_millis,
908            first_event_timestamp: None,
909            last_event_timestamp: None,
910            last_ingestion_time: None,
911            upload_sequence_token: "1".to_string(),
912            events: Vec::new(),
913        });
914
915    stream.events.push(fakecloud_logs::LogEvent {
916        timestamp: ts_millis,
917        message: payload.to_string(),
918        ingestion_time: ts_millis,
919    });
920    stream.last_event_timestamp = Some(ts_millis);
921    stream.last_ingestion_time = Some(ts_millis);
922    if stream.first_event_timestamp.is_none() {
923        stream.first_event_timestamp = Some(ts_millis);
924    }
925}
926
927/// Apply connection auth parameters to an outgoing HTTP request.
928pub(crate) fn apply_connection_auth(
929    mut builder: reqwest::RequestBuilder,
930    conn: &Connection,
931) -> reqwest::RequestBuilder {
932    match conn.authorization_type.as_str() {
933        "API_KEY" => {
934            if let Some(params) = conn.auth_parameters.get("ApiKeyAuthParameters") {
935                if let (Some(name), Some(value)) = (
936                    params["ApiKeyName"].as_str(),
937                    params["ApiKeyValue"].as_str(),
938                ) {
939                    builder = builder.header(name, value);
940                }
941            }
942        }
943        "BASIC" => {
944            if let Some(params) = conn.auth_parameters.get("BasicAuthParameters") {
945                if let (Some(user), Some(pass)) =
946                    (params["Username"].as_str(), params["Password"].as_str())
947                {
948                    builder = builder.basic_auth(user, Some(pass));
949                }
950            }
951        }
952        "OAUTH_CLIENT_CREDENTIALS" => {
953            // For OAuth, in a real implementation we'd exchange credentials for a token.
954            // Here we pass client credentials as basic auth as a reasonable approximation.
955            if let Some(params) = conn.auth_parameters.get("OAuthParameters") {
956                if let (Some(client_id), Some(client_secret)) = (
957                    params["ClientParameters"]["ClientID"].as_str(),
958                    params["ClientParameters"]["ClientSecret"].as_str(),
959                ) {
960                    builder = builder.basic_auth(client_id, Some(client_secret));
961                }
962            }
963        }
964        _ => {}
965    }
966    builder
967}
968
969/// Context shared by both put_events (direct) and put_event_in_account
970/// (cross-service) when dispatching matched targets. Optional state
971/// handles let cross-service callers (which may not be wired with full
972/// service plumbing) gracefully degrade — e.g. Lambda dispatch becomes
973/// a fire-and-forget log unless `lambda_state` is wired.
974pub(crate) struct EventDispatchContext<'a> {
975    pub(crate) state: &'a crate::state::SharedEventBridgeState,
976    pub(crate) delivery: &'a std::sync::Arc<fakecloud_core::delivery::DeliveryBus>,
977    pub(crate) lambda_state: Option<&'a fakecloud_lambda::SharedLambdaState>,
978    pub(crate) logs_state: Option<&'a fakecloud_logs::SharedLogsState>,
979    pub(crate) container_runtime:
980        &'a Option<std::sync::Arc<fakecloud_lambda::runtime::ContainerRuntime>>,
981    pub(crate) account_id: &'a str,
982    pub(crate) region: &'a str,
983}
984
985/// Single-target dispatch shared by direct PutEvents and cross-service
986/// put_event_in_account so both honour the same target shape (SQS/SNS/
987/// Lambda/Logs/Kinesis/StepFunctions/ApiDestination/HTTP) and the same
988/// InputTransformer + InputPath body resolution.
989pub(crate) fn dispatch_event_target(
990    ctx: &EventDispatchContext,
991    target: &crate::state::EventTarget,
992    event_json: &Value,
993    event_id: &str,
994    detail_type: &str,
995) {
996    let arn = &target.arn;
997    let event_str = event_json.to_string();
998    let body_str = if let Some(ref transformer) = target.input_transformer {
999        apply_input_transformer(transformer, event_json)
1000    } else if let Some(ref input) = target.input {
1001        input.clone()
1002    } else if let Some(ref input_path) = target.input_path {
1003        resolve_json_path(event_json, input_path)
1004            .map(|v| v.to_string())
1005            .unwrap_or_else(|| event_str.clone())
1006    } else {
1007        event_str.clone()
1008    };
1009
1010    if arn.contains(":sqs:") {
1011        let group_id = target
1012            .sqs_parameters
1013            .as_ref()
1014            .and_then(|p| p["MessageGroupId"].as_str())
1015            .map(|s| s.to_string());
1016        if group_id.is_some() {
1017            ctx.delivery.send_to_sqs_with_attrs(
1018                arn,
1019                &body_str,
1020                &HashMap::new(),
1021                group_id.as_deref(),
1022                None,
1023            );
1024        } else {
1025            ctx.delivery.send_to_sqs(arn, &body_str, &HashMap::new());
1026        }
1027    } else if arn.contains(":sns:") {
1028        ctx.delivery
1029            .publish_to_sns(arn, &body_str, Some(detail_type));
1030    } else if arn.contains(":lambda:") {
1031        tracing::info!(
1032            function_arn = %arn,
1033            payload = %body_str,
1034            "EventBridge delivering to Lambda function"
1035        );
1036        let now = chrono::Utc::now();
1037        {
1038            let mut accounts = ctx.state.write();
1039            let s = accounts.get_or_create(ctx.account_id);
1040            s.lambda_invocations.push(crate::state::LambdaInvocation {
1041                function_arn: arn.clone(),
1042                payload: body_str.clone(),
1043                timestamp: now,
1044            });
1045        }
1046        if let Some(ls) = ctx.lambda_state {
1047            ls.write()
1048                .default_mut()
1049                .invocations
1050                .push(fakecloud_lambda::LambdaInvocation {
1051                    function_arn: arn.clone(),
1052                    payload: body_str.clone(),
1053                    timestamp: now,
1054                    source: "aws:events".to_string(),
1055                });
1056        }
1057        invoke_lambda_async(
1058            ctx.container_runtime,
1059            &ctx.lambda_state.cloned(),
1060            arn,
1061            &body_str,
1062        );
1063    } else if arn.contains(":logs:") {
1064        tracing::info!(
1065            log_group_arn = %arn,
1066            payload = %body_str,
1067            "EventBridge delivering to CloudWatch Logs"
1068        );
1069        let now = chrono::Utc::now();
1070        {
1071            let mut accounts = ctx.state.write();
1072            let s = accounts.get_or_create(ctx.account_id);
1073            s.log_deliveries.push(crate::state::LogDelivery {
1074                log_group_arn: arn.clone(),
1075                payload: body_str.clone(),
1076                timestamp: now,
1077            });
1078        }
1079        if let Some(log_state) = ctx.logs_state {
1080            deliver_to_logs(log_state, arn, &body_str, now);
1081        }
1082    } else if arn.contains(":kinesis:") {
1083        tracing::info!(
1084            stream_arn = %arn,
1085            "EventBridge delivering to Kinesis stream"
1086        );
1087        ctx.delivery.send_to_kinesis(arn, &body_str, event_id);
1088    } else if arn.contains(":states:") {
1089        tracing::info!(
1090            state_machine_arn = %arn,
1091            "EventBridge delivering to Step Functions"
1092        );
1093        ctx.delivery.start_stepfunctions_execution(arn, &body_str);
1094        let mut accounts = ctx.state.write();
1095        let s = accounts.get_or_create(ctx.account_id);
1096        s.step_function_executions
1097            .push(crate::state::StepFunctionExecution {
1098                state_machine_arn: arn.clone(),
1099                payload: body_str.clone(),
1100                timestamp: chrono::Utc::now(),
1101            });
1102    } else if arn.contains(":api-destination/") {
1103        let accounts = ctx.state.read();
1104        let empty = crate::state::EventBridgeState::new(ctx.account_id, ctx.region);
1105        let s = accounts.get(ctx.account_id).unwrap_or(&empty);
1106        let dest = s.api_destinations.values().find(|d| d.arn == *arn).cloned();
1107        let conn = dest.as_ref().and_then(|d| {
1108            s.connections
1109                .values()
1110                .find(|c| c.arn == d.connection_arn)
1111                .cloned()
1112        });
1113        drop(accounts);
1114        if let Some(dest) = dest {
1115            let url = dest.invocation_endpoint;
1116            let method = dest.http_method;
1117            let payload = body_str.clone();
1118            tokio::spawn(async move {
1119                let client = reqwest::Client::new();
1120                let mut req_builder = match method.as_str() {
1121                    "GET" => client.get(&url),
1122                    "PUT" => client.put(&url),
1123                    "DELETE" => client.delete(&url),
1124                    "PATCH" => client.patch(&url),
1125                    "HEAD" => client.head(&url),
1126                    _ => client.post(&url),
1127                };
1128                req_builder = req_builder.header("Content-Type", "application/json");
1129                if let Some(conn) = conn {
1130                    req_builder = apply_connection_auth(req_builder, &conn);
1131                }
1132                let result = req_builder.body(payload).send().await;
1133                if let Err(e) = result {
1134                    tracing::warn!(
1135                        endpoint = %url,
1136                        error = %e,
1137                        "EventBridge ApiDestination delivery failed"
1138                    );
1139                }
1140            });
1141        }
1142    } else if arn.starts_with("https://") || arn.starts_with("http://") {
1143        let url = arn.clone();
1144        let payload = body_str.clone();
1145        tokio::spawn(async move {
1146            let client = reqwest::Client::new();
1147            let result = client
1148                .post(&url)
1149                .header("Content-Type", "application/json")
1150                .body(payload)
1151                .send()
1152                .await;
1153            if let Err(e) = result {
1154                tracing::warn!(
1155                    endpoint = %url,
1156                    error = %e,
1157                    "EventBridge HTTP target delivery failed"
1158                );
1159            }
1160        });
1161    }
1162}