Skip to main content

vantage_aws/models/logs/
stream.rs

1use serde::{Deserialize, Serialize};
2use vantage_table::table::Table;
3
4use crate::{AwsAccount, eq};
5
6use super::event::{LogEvent, events_table};
7
8/// One CloudWatch Logs stream from `DescribeLogStreams`. Field names
9/// match the wire shape.
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct LogStream {
12    #[serde(rename = "logStreamName")]
13    pub log_stream_name: String,
14    #[serde(default)]
15    pub arn: String,
16    #[serde(rename = "creationTime", default)]
17    pub creation_time: i64,
18    #[serde(rename = "firstEventTimestamp", default)]
19    pub first_event_timestamp: i64,
20    #[serde(rename = "lastEventTimestamp", default)]
21    pub last_event_timestamp: i64,
22    #[serde(rename = "lastIngestionTime", default)]
23    pub last_ingestion_time: i64,
24    #[serde(rename = "storedBytes", default)]
25    pub stored_bytes: i64,
26    #[serde(rename = "uploadSequenceToken", default)]
27    pub upload_sequence_token: String,
28}
29
30/// `DescribeLogStreams` table. AWS requires `logGroupName` before it
31/// will list anything, so add `eq("logGroupName", "...")` first.
32///
33/// ```no_run
34/// # use vantage_aws::{AwsAccount, eq};
35/// # use vantage_aws::models::logs::streams_table;
36/// # async fn run() -> vantage_core::Result<()> {
37/// # let aws = AwsAccount::from_default()?;
38/// let mut streams = streams_table(aws);
39/// streams.add_condition(eq("logGroupName", "/aws/lambda/foo"));
40/// # Ok(()) }
41/// ```
42pub fn streams_table(aws: AwsAccount) -> Table<AwsAccount, LogStream> {
43    Table::new(
44        "json1/logStreams@nextToken:logs/Logs_20140328.DescribeLogStreams",
45        aws,
46    )
47    .with_id_column("logStreamName")
48    .with_column_of::<String>("arn")
49    .with_column_of::<i64>("creationTime")
50    .with_column_of::<i64>("firstEventTimestamp")
51    .with_title_column_of::<i64>("lastEventTimestamp")
52    .with_column_of::<i64>("lastIngestionTime")
53    .with_title_column_of::<i64>("storedBytes")
54    .with_column_of::<String>("uploadSequenceToken")
55}
56
57impl LogStream {
58    /// Build a [`streams_table`] narrowed to the stream identified by
59    /// `arn`. Accepts ARNs of the shape
60    /// `arn:aws:logs:<region>:<account>:log-group:<group>:log-stream:<stream>`.
61    pub fn from_arn(arn: &str, aws: AwsAccount) -> Option<Table<AwsAccount, LogStream>> {
62        let after_group = arn.split(":log-group:").nth(1)?;
63        let (group_name, stream_name) = after_group.split_once(":log-stream:")?;
64        if group_name.is_empty() || stream_name.is_empty() {
65            return None;
66        }
67        let mut t = streams_table(aws);
68        t.add_condition(eq("logGroupName", group_name.to_string()));
69        t.add_condition(eq("logStreamNamePrefix", stream_name.to_string()));
70        Some(t)
71    }
72
73    /// The owning log group's name, parsed out of [`Self::arn`].
74    /// Stream ARNs have the shape
75    /// `arn:aws:logs:<region>:<account>:log-group:<group>:log-stream:<stream>`.
76    pub fn log_group_name(&self) -> Option<&str> {
77        let after = self.arn.split(":log-group:").nth(1)?;
78        after.split(":log-stream:").next()
79    }
80
81    /// Events table pre-filtered to *this* stream. The log group is
82    /// pulled from this stream's ARN; if the ARN is empty, pass it
83    /// in via [`Self::ref_events_in`] instead.
84    pub fn ref_events(&self, aws: AwsAccount) -> Option<Table<AwsAccount, LogEvent>> {
85        let group = self.log_group_name()?;
86        Some(self.ref_events_in(aws, group))
87    }
88
89    /// Events table pre-filtered to *this* stream within the given
90    /// log group. Use when the stream ARN isn't populated (e.g.
91    /// streams synthesised by hand).
92    pub fn ref_events_in(
93        &self,
94        aws: AwsAccount,
95        log_group_name: &str,
96    ) -> Table<AwsAccount, LogEvent> {
97        let mut t = events_table(aws);
98        t.add_condition(eq("logGroupName", log_group_name));
99        t.add_condition(eq("logStreamNamePrefix", self.log_stream_name.clone()));
100        t
101    }
102}