mod patch;
use crate::patch_support::ReadPatchFn;
use crate::schema::event_service::EventService as EventServiceSchema;
use crate::Error;
use crate::NvBmc;
use crate::Resource;
use crate::ResourceSchema;
use crate::ServiceRoot;
use futures_util::future;
use futures_util::TryStreamExt as _;
use nv_redfish_core::odata::ODataType;
use nv_redfish_core::Bmc;
use nv_redfish_core::BoxTryStream;
use serde::de;
use serde::Deserialize;
use serde::Deserializer;
use serde_json::Value as JsonValue;
use std::sync::Arc;
#[doc(inline)]
pub use crate::schema::metric_report::MetricReport;
#[doc(inline)]
pub use crate::schema::event::Event;
#[derive(Debug)]
pub enum EventStreamPayload {
Event(Event),
MetricReport(MetricReport),
}
impl<'de> Deserialize<'de> for EventStreamPayload {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let value = JsonValue::deserialize(deserializer)?;
let odata_type = ODataType::parse_from(&value)
.ok_or_else(|| de::Error::missing_field("missing @odata.type in SSE payload"))?;
if odata_type.type_name == "MetricReport" {
let payload =
serde_json::from_value::<MetricReport>(value).map_err(de::Error::custom)?;
Ok(Self::MetricReport(payload))
} else if odata_type.type_name == "Event" {
let payload = serde_json::from_value::<Event>(value).map_err(de::Error::custom)?;
Ok(Self::Event(payload))
} else {
Err(de::Error::custom(format!(
"unsupported @odata.type in SSE payload: {}, should be either Event or MetricReport", odata_type.type_name
)))
}
}
}
pub struct EventService<B: Bmc> {
data: Arc<EventServiceSchema>,
bmc: NvBmc<B>,
sse_read_patches: Vec<ReadPatchFn>,
}
impl<B: Bmc> EventService<B> {
pub(crate) async fn new(
bmc: &NvBmc<B>,
root: &ServiceRoot<B>,
) -> Result<Option<Self>, Error<B>> {
if let Some(service_ref) = &root.root.event_service {
let data = service_ref.get(bmc.as_ref()).await.map_err(Error::Bmc)?;
let mut sse_read_patches = Vec::new();
let mut sse_event_record_patches: Vec<patch::EventRecordPatchFn> = Vec::new();
if bmc.quirks.event_service_sse_no_member_id() {
sse_event_record_patches.push(patch::patch_missing_event_record_member_id);
}
if bmc.quirks.event_service_sse_missing_event_type() {
sse_event_record_patches.push(patch::patch_missing_event_type_to_unsupported);
}
if bmc.quirks.event_service_sse_no_odata_id() {
let patch_event_id: ReadPatchFn =
Arc::new(patch::patch_missing_event_odata_id as fn(JsonValue) -> JsonValue);
sse_read_patches.push(patch_event_id);
sse_event_record_patches.push(patch::patch_missing_event_record_odata_id);
}
if bmc.quirks.event_service_sse_wrong_timestamp_offset() {
sse_event_record_patches.push(patch::patch_compact_event_timestamp_offset);
}
if !sse_event_record_patches.is_empty() {
let patch_event_records: ReadPatchFn = Arc::new(move |payload| {
patch::patch_event_records(payload, &sse_event_record_patches)
});
sse_read_patches.push(patch_event_records);
}
Ok(Some(Self {
data,
bmc: bmc.clone(),
sse_read_patches,
}))
} else {
Ok(None)
}
}
#[must_use]
pub fn raw(&self) -> Arc<EventServiceSchema> {
self.data.clone()
}
pub async fn events(&self) -> Result<BoxTryStream<EventStreamPayload, Error<B>>, Error<B>>
where
B: 'static,
B::Error: 'static,
{
let stream_uri = self
.data
.server_sent_event_uri
.as_ref()
.ok_or(Error::EventServiceServerSentEventUriNotAvailable)?;
let stream = self
.bmc
.as_ref()
.stream::<JsonValue>(stream_uri)
.await
.map_err(Error::Bmc)?;
let sse_read_patches = self.sse_read_patches.clone();
let stream = stream.map_err(Error::Bmc).and_then(move |payload| {
let patched = sse_read_patches
.iter()
.fold(payload, |acc, patch| patch(acc));
future::ready(
serde_json::from_value::<EventStreamPayload>(patched).map_err(Error::Json),
)
});
Ok(Box::pin(stream))
}
}
impl<B: Bmc> Resource for EventService<B> {
fn resource_ref(&self) -> &ResourceSchema {
&self.data.as_ref().base
}
}
#[cfg(test)]
mod tests {
use super::EventStreamPayload;
#[test]
fn event_stream_payload_deserializes_event_record() {
let value = serde_json::json!({
"@odata.id": "/redfish/v1/EventService/SSE#/Event1",
"@odata.type": "#Event.v1_6_0.Event",
"Id": "1",
"Name": "Event Array",
"Context": "ABCDEFGH",
"Events": [
{
"@odata.id": "/redfish/v1/EventService/SSE#/Events/88",
"MemberId": "88",
"EventId": "88",
"EventTimestamp": "2026-02-19T03:55:29+00:00",
"EventType": "Alert",
"LogEntry": {
"@odata.id": "/redfish/v1/Systems/System_0/LogServices/EventLog/Entries/1674"
},
"Message": "The resource has been removed successfully.",
"MessageId": "ResourceEvent.1.2.ResourceRemoved",
"MessageSeverity": "OK",
"OriginOfCondition": {
"@odata.id": "/redfish/v1/AccountService/Accounts/1"
}
}
]
});
let payload: EventStreamPayload =
serde_json::from_value(value).expect("event payload must deserialize");
assert!(matches!(payload, EventStreamPayload::Event(_)));
}
#[test]
fn event_stream_payload_deserializes_metric_report() {
let value = serde_json::json!({
"@odata.id": "/redfish/v1/TelemetryService/MetricReports/AvgPlatformPowerUsage",
"@odata.type": "#MetricReport.v1_3_0.MetricReport",
"Id": "AvgPlatformPowerUsage",
"Name": "Average Platform Power Usage metric report",
"MetricReportDefinition": {
"@odata.id": "/redfish/v1/TelemetryService/MetricReportDefinitions/AvgPlatformPowerUsage"
},
"MetricValues": [
{
"MetricId": "AverageConsumedWatts",
"MetricValue": "100",
"Timestamp": "2016-11-08T12:25:00-05:00",
"MetricProperty": "/redfish/v1/Chassis/Tray_1/Power#/0/PowerConsumedWatts"
},
{
"MetricId": "AverageConsumedWatts",
"MetricValue": "94",
"Timestamp": "2016-11-08T13:25:00-05:00",
"MetricProperty": "/redfish/v1/Chassis/Tray_1/Power#/0/PowerConsumedWatts"
},
{
"MetricId": "AverageConsumedWatts",
"MetricValue": "100",
"Timestamp": "2016-11-08T14:25:00-05:00",
"MetricProperty": "/redfish/v1/Chassis/Tray_1/Power#/0/PowerConsumedWatts"
}
]
});
let payload: EventStreamPayload =
serde_json::from_value(value).expect("metric report payload must deserialize");
assert!(matches!(payload, EventStreamPayload::MetricReport(_)));
}
}