use chrono::Utc;
use crate::state::{LogEvent, LogGroup, LogStream, SharedLogsState};
#[derive(Debug, Clone)]
pub struct IngestEvent {
pub timestamp_ms: i64,
pub message: String,
}
pub fn append_events(
state: &SharedLogsState,
account_id: &str,
region: &str,
group_name: &str,
stream_name: &str,
events: &[IngestEvent],
) {
if events.is_empty() {
return;
}
let now = Utc::now().timestamp_millis();
let mut accounts = state.write();
let logs = accounts.get_or_create(account_id);
let group = logs
.log_groups
.entry(group_name.to_string())
.or_insert_with(|| LogGroup {
name: group_name.to_string(),
arn: format!(
"arn:aws:logs:{region}:{account_id}:log-group:{group_name}",
region = region,
account_id = account_id,
),
creation_time: now,
retention_in_days: None,
kms_key_id: None,
tags: Default::default(),
log_streams: Default::default(),
stored_bytes: 0,
subscription_filters: Vec::new(),
data_protection_policy: None,
index_policies: Vec::new(),
transformer: None,
deletion_protection: false,
log_group_class: Some("STANDARD".into()),
});
let stream = group
.log_streams
.entry(stream_name.to_string())
.or_insert_with(|| LogStream {
name: stream_name.to_string(),
arn: format!("{}:log-stream:{}", group.arn, stream_name),
creation_time: now,
first_event_timestamp: None,
last_event_timestamp: None,
last_ingestion_time: None,
upload_sequence_token: uuid::Uuid::new_v4().simple().to_string(),
events: Vec::new(),
});
for e in events {
if stream.first_event_timestamp.is_none() {
stream.first_event_timestamp = Some(e.timestamp_ms);
}
stream.last_event_timestamp = Some(
stream
.last_event_timestamp
.map(|t| t.max(e.timestamp_ms))
.unwrap_or(e.timestamp_ms),
);
stream.last_ingestion_time = Some(now);
group.stored_bytes += e.message.len() as i64;
stream.events.push(LogEvent {
timestamp: e.timestamp_ms,
message: e.message.clone(),
ingestion_time: now,
});
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use fakecloud_core::multi_account::MultiAccountState;
use parking_lot::RwLock;
use super::*;
#[test]
fn append_creates_group_and_stream_then_appends() {
let state = Arc::new(RwLock::new(
MultiAccountState::<crate::state::LogsState>::new(
"123456789012",
"us-east-1",
"http://localhost:4566",
),
));
append_events(
&state,
"123456789012",
"us-east-1",
"/ecs/svc",
"app/123",
&[
IngestEvent {
timestamp_ms: 1_000,
message: "hello".into(),
},
IngestEvent {
timestamp_ms: 2_000,
message: "world".into(),
},
],
);
let s = state.read();
let logs = s.get("123456789012").unwrap();
let group = logs.log_groups.get("/ecs/svc").unwrap();
let stream = group.log_streams.get("app/123").unwrap();
assert_eq!(stream.events.len(), 2);
assert_eq!(stream.events[0].message, "hello");
assert_eq!(stream.first_event_timestamp, Some(1_000));
assert_eq!(stream.last_event_timestamp, Some(2_000));
assert!(group.stored_bytes >= "hello".len() as i64 + "world".len() as i64);
}
#[test]
fn append_no_op_for_empty() {
let state = Arc::new(RwLock::new(
MultiAccountState::<crate::state::LogsState>::new(
"123456789012",
"us-east-1",
"http://localhost:4566",
),
));
append_events(&state, "123456789012", "us-east-1", "g", "s", &[]);
let s = state.read();
assert!(s
.get("123456789012")
.is_none_or(|a| a.log_groups.is_empty()));
}
}