nv_redfish/event_service/
mod.rs1mod patch;
21
22use crate::patch_support::ReadPatchFn;
23use crate::schema::redfish::event_service::EventService as EventServiceSchema;
24use crate::Error;
25use crate::NvBmc;
26use crate::Resource;
27use crate::ResourceSchema;
28use crate::ServiceRoot;
29use futures_util::future;
30use futures_util::TryStreamExt as _;
31use nv_redfish_core::odata::ODataType;
32use nv_redfish_core::Bmc;
33use nv_redfish_core::BoxTryStream;
34use serde::de;
35use serde::Deserialize;
36use serde::Deserializer;
37use serde_json::Value as JsonValue;
38use std::sync::Arc;
39
40#[doc(inline)]
41pub use crate::schema::redfish::metric_report::MetricReport;
42
43#[doc(inline)]
44pub use crate::schema::redfish::event::Event;
45
46#[derive(Debug)]
48pub enum EventStreamPayload {
49 Event(Event),
51 MetricReport(MetricReport),
53}
54
55impl<'de> Deserialize<'de> for EventStreamPayload {
56 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
57 where
58 D: Deserializer<'de>,
59 {
60 let value = JsonValue::deserialize(deserializer)?;
61 let odata_type = ODataType::parse_from(&value)
62 .ok_or_else(|| de::Error::missing_field("missing @odata.type in SSE payload"))?;
63
64 if odata_type.type_name == "MetricReport" {
65 let payload =
66 serde_json::from_value::<MetricReport>(value).map_err(de::Error::custom)?;
67 Ok(Self::MetricReport(payload))
68 } else if odata_type.type_name == "Event" {
69 let payload = serde_json::from_value::<Event>(value).map_err(de::Error::custom)?;
70 Ok(Self::Event(payload))
71 } else {
72 Err(de::Error::custom(format!(
73 "unsupported @odata.type in SSE payload: {}, should be either Event or MetricReport", odata_type.type_name
74 )))
75 }
76 }
77}
78
79pub struct EventService<B: Bmc> {
84 data: Arc<EventServiceSchema>,
85 bmc: NvBmc<B>,
86 sse_read_patches: Vec<ReadPatchFn>,
87}
88
89impl<B: Bmc> EventService<B> {
90 pub(crate) async fn new(
92 bmc: &NvBmc<B>,
93 root: &ServiceRoot<B>,
94 ) -> Result<Option<Self>, Error<B>> {
95 if let Some(service_ref) = &root.root.event_service {
96 let data = service_ref.get(bmc.as_ref()).await.map_err(Error::Bmc)?;
97
98 let mut sse_read_patches = Vec::new();
99 let mut sse_event_record_patches: Vec<patch::EventRecordPatchFn> = Vec::new();
100
101 if bmc.quirks.event_service_sse_no_member_id() {
102 sse_event_record_patches.push(patch::patch_missing_event_record_member_id);
103 }
104 if bmc.quirks.event_service_sse_wrong_event_type() {
105 sse_event_record_patches.push(patch::patch_unknown_or_missing_event_type_to_other);
106 }
107 if bmc.quirks.event_service_sse_no_odata_id() {
108 let patch_event_id: ReadPatchFn =
109 Arc::new(patch::patch_missing_event_odata_id as fn(JsonValue) -> JsonValue);
110 sse_read_patches.push(patch_event_id);
111 sse_event_record_patches.push(patch::patch_missing_event_record_odata_id);
112 }
113 if bmc.quirks.event_service_sse_wrong_timestamp_offset() {
114 sse_event_record_patches.push(patch::patch_compact_event_timestamp_offset);
115 }
116
117 if !sse_event_record_patches.is_empty() {
118 let patch_event_records: ReadPatchFn = Arc::new(move |payload| {
119 patch::patch_event_records(payload, &sse_event_record_patches)
120 });
121 sse_read_patches.push(patch_event_records);
122 }
123
124 Ok(Some(Self {
125 data,
126 bmc: bmc.clone(),
127 sse_read_patches,
128 }))
129 } else {
130 Ok(None)
131 }
132 }
133
134 #[must_use]
136 pub fn raw(&self) -> Arc<EventServiceSchema> {
137 self.data.clone()
138 }
139
140 pub async fn events(&self) -> Result<BoxTryStream<EventStreamPayload, Error<B>>, Error<B>>
153 where
154 B: 'static,
155 B::Error: 'static,
156 {
157 let stream_uri = self
158 .data
159 .server_sent_event_uri
160 .as_ref()
161 .ok_or(Error::EventServiceServerSentEventUriNotAvailable)?;
162
163 let stream = self
164 .bmc
165 .as_ref()
166 .stream::<JsonValue>(stream_uri)
167 .await
168 .map_err(Error::Bmc)?;
169
170 let sse_read_patches = self.sse_read_patches.clone();
171 let stream = stream.map_err(Error::Bmc).and_then(move |payload| {
172 let patched = sse_read_patches
173 .iter()
174 .fold(payload, |acc, patch| patch(acc));
175
176 future::ready(
177 serde_json::from_value::<EventStreamPayload>(patched).map_err(Error::Json),
178 )
179 });
180
181 Ok(Box::pin(stream))
182 }
183}
184
185impl<B: Bmc> Resource for EventService<B> {
186 fn resource_ref(&self) -> &ResourceSchema {
187 &self.data.as_ref().base
188 }
189}
190
191#[cfg(test)]
192mod tests {
193 use super::EventStreamPayload;
194
195 #[test]
196 fn event_stream_payload_deserializes_event_record() {
197 let value = serde_json::json!({
198 "@odata.id": "/redfish/v1/EventService/SSE#/Event1",
199 "@odata.type": "#Event.v1_6_0.Event",
200 "Id": "1",
201 "Name": "Event Array",
202 "Context": "ABCDEFGH",
203 "Events": [
204 {
205 "@odata.id": "/redfish/v1/EventService/SSE#/Events/88",
206 "MemberId": "88",
207 "EventId": "88",
208 "EventTimestamp": "2026-02-19T03:55:29+00:00",
209 "EventType": "Alert",
210 "LogEntry": {
211 "@odata.id": "/redfish/v1/Systems/System_0/LogServices/EventLog/Entries/1674"
212 },
213 "Message": "The resource has been removed successfully.",
214 "MessageId": "ResourceEvent.1.2.ResourceRemoved",
215 "MessageSeverity": "OK",
216 "OriginOfCondition": {
217 "@odata.id": "/redfish/v1/AccountService/Accounts/1"
218 }
219 }
220 ]
221 });
222
223 let payload: EventStreamPayload =
224 serde_json::from_value(value).expect("event payload must deserialize");
225 assert!(matches!(payload, EventStreamPayload::Event(_)));
226 }
227
228 #[test]
229 fn event_stream_payload_deserializes_metric_report() {
230 let value = serde_json::json!({
231 "@odata.id": "/redfish/v1/TelemetryService/MetricReports/AvgPlatformPowerUsage",
232 "@odata.type": "#MetricReport.v1_3_0.MetricReport",
233 "Id": "AvgPlatformPowerUsage",
234 "Name": "Average Platform Power Usage metric report",
235 "MetricReportDefinition": {
236 "@odata.id": "/redfish/v1/TelemetryService/MetricReportDefinitions/AvgPlatformPowerUsage"
237 },
238 "MetricValues": [
239 {
240 "MetricId": "AverageConsumedWatts",
241 "MetricValue": "100",
242 "Timestamp": "2016-11-08T12:25:00-05:00",
243 "MetricProperty": "/redfish/v1/Chassis/Tray_1/Power#/0/PowerConsumedWatts"
244 },
245 {
246 "MetricId": "AverageConsumedWatts",
247 "MetricValue": "94",
248 "Timestamp": "2016-11-08T13:25:00-05:00",
249 "MetricProperty": "/redfish/v1/Chassis/Tray_1/Power#/0/PowerConsumedWatts"
250 },
251 {
252 "MetricId": "AverageConsumedWatts",
253 "MetricValue": "100",
254 "Timestamp": "2016-11-08T14:25:00-05:00",
255 "MetricProperty": "/redfish/v1/Chassis/Tray_1/Power#/0/PowerConsumedWatts"
256 }
257 ]
258 });
259
260 let payload: EventStreamPayload =
261 serde_json::from_value(value).expect("metric report payload must deserialize");
262 assert!(matches!(payload, EventStreamPayload::MetricReport(_)));
263 }
264}