#[cfg(feature = "runtime_support")]
use std::marker::Unpin;
#[cfg(feature = "runtime_support")]
use std::panic::AssertUnwindSafe;
use std::time::Duration;
#[cfg(feature = "runtime_support")]
use std::time::SystemTime;
#[cfg(feature = "runtime_support")]
use dfir_rs::Never;
#[cfg(feature = "runtime_support")]
use dfir_rs::scheduled::metrics::{DfirMetrics, DfirMetricsIntervals};
#[cfg(feature = "runtime_support")]
use futures::FutureExt;
use quote::quote;
#[cfg(feature = "runtime_support")]
use serde_json::json;
use syn::parse_quote;
#[cfg(feature = "runtime_support")]
use tokio::io::{AsyncWrite, AsyncWriteExt};
#[cfg(feature = "runtime_support")]
use tokio_metrics::RuntimeMetrics;
use crate::location::{LocationKey, LocationType};
use crate::staging_util::get_this_crate;
use crate::telemetry::Sidecar;
pub const DEFAULT_FILE_PATH: &str = "/var/log/hydro/metrics.log";
pub const DEFAULT_INTERVAL: Duration = Duration::from_secs(30);
pub struct RecordMetricsSidecar {
file_path: String,
interval: Duration,
}
#[buildstructor::buildstructor]
impl RecordMetricsSidecar {
#[builder]
pub fn new(file_path: Option<String>, interval: Option<Duration>) -> Self {
Self {
file_path: file_path.unwrap_or_else(|| DEFAULT_FILE_PATH.to_owned()),
interval: interval.unwrap_or(DEFAULT_INTERVAL),
}
}
}
impl Sidecar for RecordMetricsSidecar {
fn to_expr(
&self,
flow_name: &str,
_location_key: LocationKey,
_location_type: LocationType,
location_name: &str,
dfir_ident: &syn::Ident,
) -> syn::Expr {
let Self {
file_path,
interval,
} = self;
let root = get_this_crate();
let namespace = flow_name.replace(char::is_whitespace, "_");
let interval: proc_macro2::TokenStream = {
let secs = interval.as_secs();
let nanos = interval.subsec_nanos();
quote!(::std::time::Duration::new(#secs, #nanos))
};
parse_quote! {
#root::telemetry::emf::record_metrics_sidecar(#dfir_ident.metrics_intervals(), #namespace, #location_name, #file_path, #interval)
}
}
}
#[cfg(feature = "runtime_support")]
#[doc(hidden)]
pub fn record_metrics_sidecar(
mut dfir_intervals: DfirMetricsIntervals,
namespace: &'static str,
location_name: &'static str,
file_path: &'static str,
interval: Duration,
) -> impl 'static + Future<Output = Never> {
assert!(!namespace.contains(char::is_whitespace));
async move {
if let Some(parent_dir) = std::path::Path::new(file_path).parent()
&& let Err(e) = tokio::fs::create_dir_all(parent_dir).await
{
eprintln!("Failed to create log file directory for EMF metrics: {}", e);
}
let rt_monitor = tokio_metrics::RuntimeMonitor::new(&tokio::runtime::Handle::current());
let mut rt_intervals = rt_monitor.intervals();
loop {
let _ = tokio::time::sleep(interval).await;
let dfir_metrics = dfir_intervals.take_interval();
let rt_metrics = rt_intervals.next().unwrap();
let unwind_result = AssertUnwindSafe(async {
let timestamp = SystemTime::now();
let file = tokio::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(false)
.append(true)
.open(file_path)
.await
.expect("Failed to open log file for EMF metrics.");
let mut writer = tokio::io::BufWriter::new(file);
record_metrics_dfir(
namespace,
location_name,
timestamp,
dfir_metrics,
&mut writer,
)
.await
.unwrap();
record_metrics_tokio(namespace, location_name, timestamp, rt_metrics, &mut writer)
.await
.unwrap();
writer.shutdown().await.unwrap();
})
.catch_unwind()
.await;
if let Err(panic_reason) = unwind_result {
eprintln!("Panic in metrics sidecar: {panic_reason:?}");
}
}
}
}
#[cfg(feature = "runtime_support")]
async fn record_metrics_dfir<W>(
namespace: &str,
location_name: &str,
timestamp: SystemTime,
metrics: DfirMetrics,
writer: &mut W,
) -> Result<(), std::io::Error>
where
W: AsyncWrite + Unpin,
{
let ts_millis = timestamp
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis();
for (hoff_id, hoff_metrics) in metrics.handoffs.iter() {
let emf = json!({
"_aws": {
"Timestamp": ts_millis,
"CloudWatchMetrics": [
{
"Namespace": namespace,
"Dimensions": [["LocationName"], ["LocationName", "HandoffId"]],
"Metrics": [
{"Name": "CurrItemsCount", "Unit": Unit::Count},
{"Name": "TotalItemsCount", "Unit": Unit::Count},
]
}
]
},
"LocationName": location_name,
"HandoffId": hoff_id.to_string(),
"CurrItemsCount": hoff_metrics.curr_items_count(),
"TotalItemsCount": hoff_metrics.total_items_count(),
})
.to_string();
writer.write_all(emf.as_bytes()).await?;
writer.write_u8(b'\n').await?;
}
for (sg_id, sg_metrics) in metrics.subgraphs.iter() {
let emf = json!({
"_aws": {
"Timestamp": ts_millis,
"CloudWatchMetrics": [
{
"Namespace": namespace,
"Dimensions": [["LocationName"], ["LocationName", "SubgraphId"]],
"Metrics": [
{"Name": "TotalRunCount", "Unit": Unit::Count},
{"Name": "TotalPollDuration", "Unit": Unit::Microseconds},
{"Name": "TotalPollCount", "Unit": Unit::Count},
{"Name": "TotalIdleDuration", "Unit": Unit::Microseconds},
{"Name": "TotalIdleCount", "Unit": Unit::Count},
]
}
]
},
"LocationName": location_name,
"SubgraphId": sg_id.to_string(),
"TotalRunCount": sg_metrics.total_run_count(),
"TotalPollDuration": sg_metrics.total_poll_duration().as_micros(),
"TotalPollCount": sg_metrics.total_poll_count(),
"TotalIdleDuration": sg_metrics.total_idle_duration().as_micros(),
"TotalIdleCount": sg_metrics.total_idle_count(),
})
.to_string();
writer.write_all(emf.as_bytes()).await?;
writer.write_u8(b'\n').await?;
}
Ok(())
}
#[cfg(feature = "runtime_support")]
async fn record_metrics_tokio<W>(
namespace: &str,
location_name: &str,
timestamp: SystemTime,
rt_metrics: RuntimeMetrics,
writer: &mut W,
) -> Result<(), std::io::Error>
where
W: AsyncWrite + Unpin,
{
let ts_millis = timestamp
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis();
let emf = json!({
"_aws": {
"Timestamp": ts_millis,
"CloudWatchMetrics": [
{
"Namespace": namespace,
"Dimensions": [["LocationName"]],
"Metrics": [
{"Name": "TotalBusyDuration", "Unit": Unit::Microseconds},
{"Name": "GlobalQueueDepth", "Unit": Unit::Count},
]
}
]
},
"LocationName": location_name,
"TotalBusyDuration": rt_metrics.total_busy_duration.as_micros(),
"GlobalQueueDepth": rt_metrics.global_queue_depth,
})
.to_string();
writer.write_all(emf.as_bytes()).await?;
writer.write_u8(b'\n').await?;
Ok(())
}
#[expect(missing_docs, reason = "self-explanatory")]
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
pub enum Unit {
#[default]
None,
Seconds,
Microseconds,
Milliseconds,
Bytes,
Kilobytes,
Megabytes,
Gigabytes,
Terabytes,
Bits,
Kilobits,
Megabits,
Gigabits,
Terabits,
Percent,
Count,
#[serde(rename = "Bytes/Second")]
BytesPerSecond,
#[serde(rename = "Kilobytes/Second")]
KilobytesPerSecond,
#[serde(rename = "Megabytes/Second")]
MegabytesPerSecond,
#[serde(rename = "Gigabytes/Second")]
GigabytesPerSecond,
#[serde(rename = "Terabytes/Second")]
TerabytesPerSecond,
#[serde(rename = "Bits/Second")]
BitsPerSecond,
#[serde(rename = "Kilobits/Second")]
KilobitsPerSecond,
#[serde(rename = "Megabits/Second")]
MegabitsPerSecond,
#[serde(rename = "Gigabits/Second")]
GigabitsPerSecond,
#[serde(rename = "Terabits/Second")]
TerabitsPerSecond,
#[serde(rename = "Count/Second")]
CountPerSecond,
}