1use chrono::Utc;
11
12use crate::state::{LogEvent, LogGroup, LogStream, SharedLogsState};
13
14#[derive(Debug, Clone)]
16pub struct IngestEvent {
17 pub timestamp_ms: i64,
18 pub message: String,
19}
20
21pub fn append_events(
26 state: &SharedLogsState,
27 account_id: &str,
28 region: &str,
29 group_name: &str,
30 stream_name: &str,
31 events: &[IngestEvent],
32) {
33 if events.is_empty() {
34 return;
35 }
36 let now = Utc::now().timestamp_millis();
37 let mut accounts = state.write();
38 let logs = accounts.get_or_create(account_id);
39 let group = logs
40 .log_groups
41 .entry(group_name.to_string())
42 .or_insert_with(|| LogGroup {
43 name: group_name.to_string(),
44 arn: format!(
45 "arn:aws:logs:{region}:{account_id}:log-group:{group_name}",
46 region = region,
47 account_id = account_id,
48 ),
49 creation_time: now,
50 retention_in_days: None,
51 kms_key_id: None,
52 tags: Default::default(),
53 log_streams: Default::default(),
54 stored_bytes: 0,
55 subscription_filters: Vec::new(),
56 data_protection_policy: None,
57 index_policies: Vec::new(),
58 transformer: None,
59 deletion_protection: false,
60 log_group_class: Some("STANDARD".into()),
61 });
62 let stream = group
63 .log_streams
64 .entry(stream_name.to_string())
65 .or_insert_with(|| LogStream {
66 name: stream_name.to_string(),
67 arn: format!("{}:log-stream:{}", group.arn, stream_name),
68 creation_time: now,
69 first_event_timestamp: None,
70 last_event_timestamp: None,
71 last_ingestion_time: None,
72 upload_sequence_token: uuid::Uuid::new_v4().simple().to_string(),
73 events: Vec::new(),
74 });
75
76 for e in events {
77 if stream.first_event_timestamp.is_none() {
78 stream.first_event_timestamp = Some(e.timestamp_ms);
79 }
80 stream.last_event_timestamp = Some(
81 stream
82 .last_event_timestamp
83 .map(|t| t.max(e.timestamp_ms))
84 .unwrap_or(e.timestamp_ms),
85 );
86 stream.last_ingestion_time = Some(now);
87 group.stored_bytes += e.message.len() as i64;
88 stream.events.push(LogEvent {
89 timestamp: e.timestamp_ms,
90 message: e.message.clone(),
91 ingestion_time: now,
92 });
93 }
94}
95
96#[cfg(test)]
97mod tests {
98 use std::sync::Arc;
99
100 use fakecloud_core::multi_account::MultiAccountState;
101 use parking_lot::RwLock;
102
103 use super::*;
104
105 #[test]
106 fn append_creates_group_and_stream_then_appends() {
107 let state = Arc::new(RwLock::new(
108 MultiAccountState::<crate::state::LogsState>::new(
109 "123456789012",
110 "us-east-1",
111 "http://localhost:4566",
112 ),
113 ));
114 append_events(
115 &state,
116 "123456789012",
117 "us-east-1",
118 "/ecs/svc",
119 "app/123",
120 &[
121 IngestEvent {
122 timestamp_ms: 1_000,
123 message: "hello".into(),
124 },
125 IngestEvent {
126 timestamp_ms: 2_000,
127 message: "world".into(),
128 },
129 ],
130 );
131 let s = state.read();
132 let logs = s.get("123456789012").unwrap();
133 let group = logs.log_groups.get("/ecs/svc").unwrap();
134 let stream = group.log_streams.get("app/123").unwrap();
135 assert_eq!(stream.events.len(), 2);
136 assert_eq!(stream.events[0].message, "hello");
137 assert_eq!(stream.first_event_timestamp, Some(1_000));
138 assert_eq!(stream.last_event_timestamp, Some(2_000));
139 assert!(group.stored_bytes >= "hello".len() as i64 + "world".len() as i64);
140 }
141
142 #[test]
143 fn append_no_op_for_empty() {
144 let state = Arc::new(RwLock::new(
145 MultiAccountState::<crate::state::LogsState>::new(
146 "123456789012",
147 "us-east-1",
148 "http://localhost:4566",
149 ),
150 ));
151 append_events(&state, "123456789012", "us-east-1", "g", "s", &[]);
152 let s = state.read();
153 assert!(s
154 .get("123456789012")
155 .is_none_or(|a| a.log_groups.is_empty()));
156 }
157}