Skip to main content

fakecloud_logs/
ingest.rs

1//! Same-process log ingestion for cross-service callers.
2//!
3//! ECS's `awslogs` driver and other in-tree producers (future Lambda
4//! CloudWatch-Logs forwarding) need to write events into fakecloud-logs
5//! without going through HTTP PutLogEvents. This module exposes a thin,
6//! well-typed API that creates the log group/stream on demand and
7//! appends events, mirroring the same validation shape as
8//! `service::streams::put_log_events` but skipping `AwsRequest` parsing.
9
10use chrono::Utc;
11
12use crate::state::{LogEvent, LogGroup, LogStream, SharedLogsState};
13
14/// An event destined for a CloudWatch Log stream.
15#[derive(Debug, Clone)]
16pub struct IngestEvent {
17    pub timestamp_ms: i64,
18    pub message: String,
19}
20
21/// Ensure a log group and stream exist for `account_id`, then append
22/// `events` to the stream. Creates the group with no retention and the
23/// stream with a fresh upload sequence token if either is missing —
24/// matching what an `awslogs-create-group=true` driver would do.
25pub fn append_events(
26    state: &SharedLogsState,
27    account_id: &str,
28    region: &str,
29    group_name: &str,
30    stream_name: &str,
31    events: &[IngestEvent],
32) {
33    if events.is_empty() {
34        return;
35    }
36    let now = Utc::now().timestamp_millis();
37    let mut accounts = state.write();
38    let logs = accounts.get_or_create(account_id);
39    let group = logs
40        .log_groups
41        .entry(group_name.to_string())
42        .or_insert_with(|| LogGroup {
43            name: group_name.to_string(),
44            arn: format!(
45                "arn:aws:logs:{region}:{account_id}:log-group:{group_name}",
46                region = region,
47                account_id = account_id,
48            ),
49            creation_time: now,
50            retention_in_days: None,
51            kms_key_id: None,
52            tags: Default::default(),
53            log_streams: Default::default(),
54            stored_bytes: 0,
55            subscription_filters: Vec::new(),
56            data_protection_policy: None,
57            index_policies: Vec::new(),
58            transformer: None,
59            deletion_protection: false,
60            log_group_class: Some("STANDARD".into()),
61        });
62    let stream = group
63        .log_streams
64        .entry(stream_name.to_string())
65        .or_insert_with(|| LogStream {
66            name: stream_name.to_string(),
67            arn: format!("{}:log-stream:{}", group.arn, stream_name),
68            creation_time: now,
69            first_event_timestamp: None,
70            last_event_timestamp: None,
71            last_ingestion_time: None,
72            upload_sequence_token: uuid::Uuid::new_v4().simple().to_string(),
73            events: Vec::new(),
74        });
75
76    for e in events {
77        if stream.first_event_timestamp.is_none() {
78            stream.first_event_timestamp = Some(e.timestamp_ms);
79        }
80        stream.last_event_timestamp = Some(
81            stream
82                .last_event_timestamp
83                .map(|t| t.max(e.timestamp_ms))
84                .unwrap_or(e.timestamp_ms),
85        );
86        stream.last_ingestion_time = Some(now);
87        group.stored_bytes += e.message.len() as i64;
88        stream.events.push(LogEvent {
89            timestamp: e.timestamp_ms,
90            message: e.message.clone(),
91            ingestion_time: now,
92        });
93    }
94}
95
96#[cfg(test)]
97mod tests {
98    use std::sync::Arc;
99
100    use fakecloud_core::multi_account::MultiAccountState;
101    use parking_lot::RwLock;
102
103    use super::*;
104
105    #[test]
106    fn append_creates_group_and_stream_then_appends() {
107        let state = Arc::new(RwLock::new(
108            MultiAccountState::<crate::state::LogsState>::new(
109                "123456789012",
110                "us-east-1",
111                "http://localhost:4566",
112            ),
113        ));
114        append_events(
115            &state,
116            "123456789012",
117            "us-east-1",
118            "/ecs/svc",
119            "app/123",
120            &[
121                IngestEvent {
122                    timestamp_ms: 1_000,
123                    message: "hello".into(),
124                },
125                IngestEvent {
126                    timestamp_ms: 2_000,
127                    message: "world".into(),
128                },
129            ],
130        );
131        let s = state.read();
132        let logs = s.get("123456789012").unwrap();
133        let group = logs.log_groups.get("/ecs/svc").unwrap();
134        let stream = group.log_streams.get("app/123").unwrap();
135        assert_eq!(stream.events.len(), 2);
136        assert_eq!(stream.events[0].message, "hello");
137        assert_eq!(stream.first_event_timestamp, Some(1_000));
138        assert_eq!(stream.last_event_timestamp, Some(2_000));
139        assert!(group.stored_bytes >= "hello".len() as i64 + "world".len() as i64);
140    }
141
142    #[test]
143    fn append_no_op_for_empty() {
144        let state = Arc::new(RwLock::new(
145            MultiAccountState::<crate::state::LogsState>::new(
146                "123456789012",
147                "us-east-1",
148                "http://localhost:4566",
149            ),
150        ));
151        append_events(&state, "123456789012", "us-east-1", "g", "s", &[]);
152        let s = state.read();
153        assert!(s
154            .get("123456789012")
155            .is_none_or(|a| a.log_groups.is_empty()));
156    }
157}