use std::{
io::Read,
str::{from_utf8, FromStr},
sync::Arc,
};
use eyre::{eyre, Result};
use log::warn;
use ssf::MsgMailbox;
use tiny_http::{Method, Request, Response};
use crate::{
config::Config,
http_server::{HttpHandler, HttpHandlerResult, SessionRequest},
mar::MarConfig,
metrics::SessionName,
network::NetworkConfig,
};
use super::SessionEventMessage;
#[derive(Clone)]
pub struct SessionEventHandler {
data_collection_enabled: bool,
session_events_mbox: MsgMailbox<SessionEventMessage>,
network_config: Arc<NetworkConfig>,
mar_config: Arc<MarConfig>,
}
impl SessionEventHandler {
pub fn new(
data_collection_enabled: bool,
session_events_mbox: MsgMailbox<SessionEventMessage>,
config: &Config,
) -> Self {
let network_config = Arc::new(NetworkConfig::from(config));
let mar_config = Arc::new(MarConfig::from(config));
Self {
data_collection_enabled,
session_events_mbox,
network_config,
mar_config,
}
}
fn parse_request(stream: &mut dyn Read) -> Result<SessionRequest> {
let mut buf = vec![];
stream.read_to_end(&mut buf)?;
let body = from_utf8(&buf)?;
match serde_json::from_str(body) {
Ok(request_body) => Ok(request_body),
Err(e) => {
Ok(SessionRequest::new_without_readings(
SessionName::from_str(body)
.map_err(|_| eyre!("Couldn't parse request: {}", e))?,
))
}
}
}
fn start_session(
&self,
name: SessionName,
readings: Vec<super::KeyedMetricReading>,
) -> Result<()> {
self.session_events_mbox
.send_and_wait_for_reply(SessionEventMessage::StartSession { name, readings })?
}
fn stop_session(
&self,
name: SessionName,
readings: Vec<super::KeyedMetricReading>,
) -> Result<()> {
self.session_events_mbox
.send_and_wait_for_reply(SessionEventMessage::StopSession {
name,
readings,
network_config: self.network_config.clone(),
mar_config: self.mar_config.clone(),
})?
}
}
impl HttpHandler for SessionEventHandler {
fn handle_request(&self, request: &mut Request) -> HttpHandlerResult {
if (request.url() != "/v1/session/start" && request.url() != "/v1/session/end")
|| *request.method() != Method::Post
{
return HttpHandlerResult::NotHandled;
}
if self.data_collection_enabled {
match Self::parse_request(request.as_reader()) {
Ok(SessionRequest {
session_name,
readings,
}) => {
if request.url() == "/v1/session/start" {
if let Err(e) = self.start_session(session_name, readings) {
return HttpHandlerResult::Error(format!(
"Failed to start session: {:?}",
e
));
}
} else if request.url() == "/v1/session/end" {
if let Err(e) = self.stop_session(session_name, readings) {
return HttpHandlerResult::Error(format!(
"Failed to end session: {:?}",
e
));
}
}
}
Err(e) => {
warn!("Failed to parse session request: {:?}", e);
return HttpHandlerResult::Error(format!(
"Failed to parse session request: {:?}",
e
));
}
}
}
HttpHandlerResult::Response(Response::empty(200).boxed())
}
}
#[cfg(test)]
mod tests {
use std::{
collections::{BTreeMap, HashSet},
fs::create_dir_all,
num::NonZeroU32,
path::{Path, PathBuf},
str::FromStr,
};
use insta::assert_json_snapshot;
use rstest::{fixture, rstest};
use ssf::{PingMessage, SharedServiceThread};
use tempfile::TempDir;
use tiny_http::{Method, TestRequest};
use crate::{
config::{SessionConfig, MAR_STAGING_SUBDIRECTORY},
http_server::{HttpHandler, HttpHandlerResult},
mar::manifest::{Manifest, Metadata},
metrics::{
hrt::HRT_DEFAULT_MAX_SAMPLES_PER_MIN, KeyedMetricReading, MetricReportManager,
MetricStringKey, MetricValue, MetricsSet, SessionName,
},
test_utils::in_histograms,
util::path::AbsolutePath,
};
use super::*;
use crate::test_utils::setup_logger;
#[rstest]
fn test_start_without_stop_session(mut fixture: Fixture) {
let r = TestRequest::new()
.with_method(Method::Post)
.with_path("/v1/session/start")
.with_body("test-session");
assert!(matches!(
fixture.handler.handle_request(&mut r.into()),
HttpHandlerResult::Response(_)
));
fixture.send_metrics(&mut in_histograms(vec![
("foo", 1.0),
("bar", 2.0),
("not-captured", 3.0),
]));
assert_json_snapshot!(fixture.take_session_metrics());
}
#[rstest]
fn test_start_with_metrics(_setup_logger: (), mut fixture: Fixture) {
let r = TestRequest::new()
.with_method(Method::Post)
.with_path("/v1/session/start")
.with_body("{\"session_name\": \"test-session\",
\"readings\":
[
{\"name\": \"foo\", \"value\": {\"Gauge\": {\"value\": 1.0, \"timestamp\": \"2024-01-01 00:00:00 UTC\"}}},
{\"name\": \"bar\", \"value\": {\"Gauge\": {\"value\": 4.0, \"timestamp\": \"2024-01-01 00:00:00 UTC\"}}},
{\"name\": \"baz\", \"value\": {\"ReportTag\": {\"value\": \"test-tag\", \"timestamp\": \"2024-01-01 00:00:00 UTC\"}}}
]
}");
let response = fixture.handler.handle_request(&mut r.into());
assert!(matches!(response, HttpHandlerResult::Response(_)));
assert_json_snapshot!(fixture.take_session_metrics());
}
#[rstest]
fn test_end_with_metrics(_setup_logger: (), fixture: Fixture) {
let r = TestRequest::new()
.with_method(Method::Post)
.with_path("/v1/session/start")
.with_body("test-session");
assert!(matches!(
fixture.handler.handle_request(&mut r.into()),
HttpHandlerResult::Response(_)
));
let r = TestRequest::new()
.with_method(Method::Post)
.with_path("/v1/session/end")
.with_body("{\"session_name\": \"test-session\",
\"readings\":
[
{\"name\": \"foo\", \"value\": {\"Gauge\": {\"value\": 1.0, \"timestamp\": \"2024-01-01 00:00:00 UTC\"}}},
{\"name\": \"bar\", \"value\": {\"Gauge\": {\"value\": 3.0, \"timestamp\": \"2024-01-01 00:00:00 UTC\"}}}
]
}");
assert!(matches!(
fixture.handler.handle_request(&mut r.into()),
HttpHandlerResult::Response(_)
));
fixture.process_all();
verify_dumped_metric_report(&fixture.mar_staging_path(), "end_with_metrics")
}
#[rstest]
fn test_start_twice_without_stop_session(_setup_logger: (), mut fixture: Fixture) {
let r = TestRequest::new()
.with_method(Method::Post)
.with_path("/v1/session/start")
.with_body("test-session");
assert!(matches!(
fixture.handler.handle_request(&mut r.into()),
HttpHandlerResult::Response(_)
));
fixture.send_metrics(&mut in_histograms(vec![
("foo", 10.0),
("bar", 20.0),
("not-captured", 30.0),
]));
let r = TestRequest::new()
.with_method(Method::Post)
.with_path("/v1/session/start")
.with_body("test-session");
assert!(matches!(
fixture.handler.handle_request(&mut r.into()),
HttpHandlerResult::Response(_)
));
fixture.send_metrics(&mut in_histograms(vec![
("foo", 1.0),
("bar", 2.0),
("not-captured", 3.0),
]));
assert_json_snapshot!(fixture.take_session_metrics());
}
#[rstest]
fn test_start_then_stop_session(_setup_logger: (), mut fixture: Fixture) {
let r = TestRequest::new()
.with_method(Method::Post)
.with_path("/v1/session/start")
.with_body("{\"session_name\": \"test-session\", \"readings\": []}");
assert!(matches!(
fixture.handler.handle_request(&mut r.into()),
HttpHandlerResult::Response(_)
));
fixture.send_metrics(&mut in_histograms(vec![
("bar", 20.0),
("not-captured", 30.0),
]));
let r = TestRequest::new()
.with_method(Method::Post)
.with_path("/v1/session/end")
.with_body("{\"session_name\": \"test-session\",
\"readings\":
[
{\"name\": \"foo\", \"value\": {\"Gauge\": {\"value\": 100, \"timestamp\": \"2024-01-01 00:00:00 UTC\"}}},
{\"name\": \"baz\", \"value\": {\"ReportTag\": {\"value\": \"test-tag\", \"timestamp\": \"2024-01-01 00:00:00 UTC\"}}}
]
}");
assert!(matches!(
fixture.handler.handle_request(&mut r.into()),
HttpHandlerResult::Response(_)
));
fixture.process_all();
verify_dumped_metric_report(&fixture.mar_staging_path(), "start_then_stop");
assert!(fixture
.jig
.shared()
.lock()
.unwrap()
.take_session_metrics(&SessionName::from_str("test-session").unwrap())
.is_err());
}
#[rstest]
fn test_stop_without_start_session(_setup_logger: (), fixture: Fixture) {
let r = TestRequest::new()
.with_method(Method::Post)
.with_path("/v1/session/end")
.with_body("test-session");
assert!(matches!(
fixture.handler.handle_request(&mut r.into()),
HttpHandlerResult::Error(_)
));
}
struct Fixture {
handler: SessionEventHandler,
jig: SharedServiceThread<MetricReportManager>,
tempdir: TempDir,
}
impl Fixture {
fn take_session_metrics(&mut self) -> BTreeMap<MetricStringKey, MetricValue> {
self.process_all();
self.jig
.shared()
.lock()
.unwrap()
.take_session_metrics(&SessionName::from_str("test-session").unwrap())
.unwrap()
.into_iter()
.collect()
}
fn send_metrics(&mut self, metrics: &mut dyn Iterator<Item = KeyedMetricReading>) {
self.jig
.mbox()
.send_and_wait_for_reply(metrics.collect::<Vec<_>>())
.expect("error delivering metrics");
}
fn process_all(&self) {
self.jig
.mbox()
.send_and_wait_for_reply(PingMessage {})
.expect("unable to ping thread")
}
fn mar_staging_path(&self) -> PathBuf {
self.tempdir.path().join(MAR_STAGING_SUBDIRECTORY)
}
}
#[fixture]
fn fixture() -> Fixture {
let session_config = SessionConfig {
name: SessionName::from_str("test-session").unwrap(),
captured_metrics: HashSet::from_iter([
MetricStringKey::from_str("foo").unwrap(),
MetricStringKey::from_str("bar").unwrap(),
MetricStringKey::from_str("baz").unwrap(),
]),
};
let jig = SharedServiceThread::spawn_with(MetricReportManager::new_with_session_configs(
true,
NonZeroU32::new(HRT_DEFAULT_MAX_SAMPLES_PER_MIN).unwrap(),
&[session_config],
true,
MetricsSet::empty(),
));
let tempdir = TempDir::new().unwrap();
let mut config = Config::test_fixture();
create_dir_all(tempdir.path().join(MAR_STAGING_SUBDIRECTORY)).unwrap();
config.config_file.persist_dir = AbsolutePath::try_from(tempdir.path().to_owned()).unwrap();
let handler = SessionEventHandler::new(true, jig.mbox().into(), &config);
Fixture {
handler,
jig,
tempdir,
}
}
fn verify_dumped_metric_report(mar_staging_path: &Path, test_name: &str) {
let mar_dir = std::fs::read_dir(mar_staging_path)
.expect("Failed to read temp dir")
.filter_map(|entry| entry.ok())
.collect::<Vec<_>>();
assert_eq!(mar_dir.len(), 1);
let mar_manifest = mar_dir[0].path().join("manifest.json");
let manifest_string = std::fs::read_to_string(mar_manifest).unwrap();
let manifest: Manifest = serde_json::from_str(&manifest_string).unwrap();
if let Metadata::LinuxMetricReport { .. } = manifest.metadata {
assert_json_snapshot!(test_name, manifest.metadata, {".metadata.duration_ms" => 0, ".metadata.boottime_duration_ms" => 0});
} else {
panic!("Unexpected metadata type");
}
}
}