grafbase_sdk/host_io/event_queue.rs
1//! Event queuing functionality for tracking and recording request events.
2//!
3//! This module provides comprehensive event queuing capabilities for tracking various
4//! types of operations and requests within the Grafbase Gateway system. It supports tracking
5//! of GraphQL operations, subgraph requests, HTTP requests, and custom extension logs.
6//!
7//! # Overview
8//!
9//! The event queue system is designed to capture detailed information about:
10//! - GraphQL operation execution (including timing, caching, and status)
11//! - Subgraph request details (including retries, caching, and response times)
12//! - HTTP request execution
13//! - Custom extension logs with serializable data
14//!
15//! # Example
16//!
17//! ```no_run
18//! use grafbase_sdk::host_io::event_queue;
19//! use serde::Serialize;
20//!
21//! #[derive(Serialize)]
22//! struct CustomEvent {
23//! user_id: String,
24//! action: String,
25//! timestamp: u64,
26//! }
27//!
28//! // Send a custom event
29//! let log = CustomEvent {
30//! user_id: "user123".to_string(),
31//! action: "query_execution".to_string(),
32//! timestamp: 1234567890,
33//! };
34//!
35//! event_queue::send("custom_event", log).expect("Failed to send event");
36//! ```
37//!
38//! # Log Aggregation
39//!
40//! By itself, event queue calls do nothing in the Grafbase Gateway. You must implement
41//! an [`Hosts`] type of an extension with event filtering, which will be called after
42//! a response is sent back to the user.
43
44use std::time::Duration;
45
46use crate::{SdkError, wit};
47
48/// Sends an event queue entry to the system.
49///
50/// This function serializes the provided log data and sends it to the event queue
51/// system. The log data can be any type that implements `serde::Serialize`.
52///
53/// # Arguments
54///
55/// * `name` - The name of the event to be logged. Used in event filtering.
56/// * `data` - The log data to be sent. Must implement `serde::Serialize`.
57///
58/// # Returns
59///
60/// Returns `Ok(())` on success, or an `SdkError` if serialization fails.
61///
62/// # Example
63///
64/// ```no_run
65/// use serde::Serialize;
66/// use grafbase_sdk::host_io::event_queue;
67///
68/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
69/// #[derive(Serialize)]
70/// struct UserAction<'a> {
71/// action: &'a str,
72/// user_id: &'a str,
73/// }
74///
75/// let action = UserAction {
76/// action: "login",
77/// user_id: "user123"
78/// };
79///
80/// event_queue::send("user_action", action)?;
81/// # Ok(())
82/// # }
83/// ```
84pub fn send<T>(name: &str, data: T) -> Result<(), SdkError>
85where
86 T: serde::Serialize,
87{
88 if !crate::component::can_skip_sending_events() {
89 let data = minicbor_serde::to_vec(data)?;
90 crate::component::queue_event(name, &data);
91 }
92
93 Ok(())
94}
95
96/// A queue of event queue per request from the engine.
97///
98/// This struct provides access to event queue that have been generated during
99/// request processing. Logs can be retrieved and processed using the `pop` method.
100pub struct EventQueue(wit::EventQueue);
101
102impl From<wit::EventQueue> for EventQueue {
103 fn from(value: wit::EventQueue) -> Self {
104 Self(value)
105 }
106}
107
108impl EventQueue {
109 /// Retrieves and removes the next log entry from the queue.
110 pub fn pop(&self) -> Option<Event> {
111 self.0.pop().map(Into::into)
112 }
113}
114
115/// Represents different types of event queue entries.
116///
117/// This enum categorizes the various types of operations and requests that can be
118/// logged in the event queue system.
119pub enum Event {
120 /// A GraphQL operation that was executed.
121 Operation(ExecutedOperation),
122 /// A request made to a subgraph.
123 Subgraph(ExecutedSubgraphRequest),
124 /// An HTTP request that was executed.
125 Http(ExecutedHttpRequest),
126 /// A custom extension log entry with serialized data.
127 Extension(ExtensionEvent),
128}
129
130impl From<wit::Event> for Event {
131 fn from(value: wit::Event) -> Self {
132 match value {
133 wit::Event::Operation(executed_operation) => Self::Operation(executed_operation.into()),
134 wit::Event::Subgraph(executed_subgraph_request) => Self::Subgraph(executed_subgraph_request.into()),
135 wit::Event::Http(executed_http_request) => Self::Http(executed_http_request.into()),
136 wit::Event::Extension(items) => Self::Extension(ExtensionEvent(items)),
137 }
138 }
139}
140
141/// Represents an executed GraphQL operation with detailed metrics.
142pub struct ExecutedOperation(wit::ExecutedOperation);
143
144impl From<wit::ExecutedOperation> for ExecutedOperation {
145 fn from(value: wit::ExecutedOperation) -> Self {
146 Self(value)
147 }
148}
149
150impl ExecutedOperation {
151 /// Returns the name of the GraphQL operation, if available.
152 pub fn name(&self) -> Option<&str> {
153 self.0.name.as_deref()
154 }
155
156 /// Returns the GraphQL document (query/mutation/subscription) that was executed.
157 /// The operation is in normalized form, with all possible user data removed.
158 pub fn document(&self) -> &str {
159 &self.0.document
160 }
161
162 /// Returns the duration spent preparing the operation for execution.
163 ///
164 /// This includes parsing, validation, and query planning time.
165 pub fn prepare_duration(&self) -> Duration {
166 Duration::from_nanos(self.0.prepare_duration_ns)
167 }
168
169 /// Returns the total duration of the operation execution.
170 ///
171 /// This includes the actual execution time after preparation.
172 pub fn duration(&self) -> Duration {
173 Duration::from_nanos(self.0.duration_ns)
174 }
175
176 /// Indicates whether a cached execution plan was used for this operation.
177 pub fn cached_plan(&self) -> bool {
178 self.0.cached_plan
179 }
180
181 /// Returns the status of the GraphQL response.
182 pub fn status(&self) -> GraphqlResponseStatus {
183 self.0.status.into()
184 }
185}
186
187/// Represents the status of a GraphQL response.
188///
189/// This enum categorizes the different outcomes of a GraphQL operation execution.
190pub enum GraphqlResponseStatus {
191 /// The operation completed successfully without errors.
192 Success,
193 /// The operation completed but encountered field-level errors.
194 FieldError(FieldError),
195 /// The operation failed due to request-level errors.
196 RequestError(RequestError),
197 /// The request was refused before execution (e.g., due to authentication or rate limiting).
198 RefusedRequest,
199}
200
201/// Contains information about field-level errors in a GraphQL response.
202pub struct FieldError {
203 /// The number of field errors encountered.
204 pub count: u64,
205 /// Indicates whether the data field in the response is null.
206 pub data_is_null: bool,
207}
208
209/// Contains information about request-level errors in a GraphQL response.
210pub struct RequestError {
211 /// The number of request errors encountered.
212 pub count: u64,
213}
214
215impl From<wit::GraphqlResponseStatus> for GraphqlResponseStatus {
216 fn from(value: wit::GraphqlResponseStatus) -> Self {
217 match value {
218 wit::GraphqlResponseStatus::Success => GraphqlResponseStatus::Success,
219 wit::GraphqlResponseStatus::FieldError(wit::FieldError { count, data_is_null }) => {
220 GraphqlResponseStatus::FieldError(FieldError { count, data_is_null })
221 }
222 wit::GraphqlResponseStatus::RequestError(wit::RequestError { count }) => {
223 GraphqlResponseStatus::RequestError(RequestError { count })
224 }
225 wit::GraphqlResponseStatus::RefusedRequest => GraphqlResponseStatus::RefusedRequest,
226 }
227 }
228}
229
230/// Represents a request made to a subgraph with detailed execution information.
231///
232/// This struct contains comprehensive information about a subgraph request,
233/// including retry attempts, caching status, and timing metrics.
234pub struct ExecutedSubgraphRequest(wit::ExecutedSubgraphRequest);
235
236impl ExecutedSubgraphRequest {
237 /// Returns the name of the subgraph that was queried.
238 pub fn subgraph_name(&self) -> &str {
239 &self.0.subgraph_name
240 }
241
242 /// Returns the HTTP method used for the subgraph request.
243 pub fn method(&self) -> http::Method {
244 self.0.method.into()
245 }
246
247 /// Returns the URL of the subgraph endpoint.
248 pub fn url(&self) -> &str {
249 &self.0.url
250 }
251
252 /// Returns an iterator over all execution attempts for this subgraph request.
253 ///
254 /// This includes both successful responses and various types of failures
255 /// (e.g., rate limiting, server errors).
256 pub fn executions(&self) -> impl Iterator<Item = RequestExecution> {
257 self.0.executions.clone().into_iter().map(RequestExecution::from)
258 }
259
260 /// Returns the cache status for this subgraph request.
261 pub fn cache_status(&self) -> CacheStatus {
262 self.0.cache_status.into()
263 }
264
265 /// Returns the total duration of all execution attempts.
266 pub fn total_duration(&self) -> Duration {
267 Duration::from_nanos(self.0.total_duration_ns)
268 }
269
270 /// Indicates whether any errors were encountered during the subgraph request.
271 pub fn has_errors(&self) -> bool {
272 self.0.has_errors
273 }
274}
275
276impl From<wit::ExecutedSubgraphRequest> for ExecutedSubgraphRequest {
277 fn from(value: wit::ExecutedSubgraphRequest) -> Self {
278 Self(value)
279 }
280}
281
282/// Represents a single execution attempt of a subgraph request.
283///
284/// This enum captures the different outcomes of attempting to execute a request
285/// to a subgraph endpoint.
286pub enum RequestExecution {
287 /// The subgraph returned a 5xx server error.
288 InternalServerError,
289 /// The request failed due to network or other request-level errors.
290 RequestError,
291 /// The request was rate limited by the engine rate limiter.
292 RateLimited,
293 /// The subgraph returned a response (which may still contain GraphQL errors).
294 Response(SubgraphResponse),
295}
296
297impl From<wit::SubgraphRequestExecutionKind> for RequestExecution {
298 fn from(value: wit::SubgraphRequestExecutionKind) -> Self {
299 match value {
300 wit::SubgraphRequestExecutionKind::InternalServerError => Self::InternalServerError,
301 wit::SubgraphRequestExecutionKind::RequestError => Self::RequestError,
302 wit::SubgraphRequestExecutionKind::RateLimited => Self::RateLimited,
303 wit::SubgraphRequestExecutionKind::Response(subgraph_response) => {
304 Self::Response(SubgraphResponse(subgraph_response))
305 }
306 }
307 }
308}
309
310/// Contains timing and status information for a successful subgraph response.
311pub struct SubgraphResponse(wit::SubgraphResponse);
312
313impl SubgraphResponse {
314 /// Returns the time taken to establish a connection to the subgraph.
315 pub fn connection_time(&self) -> Duration {
316 Duration::from_nanos(self.0.connection_time_ns)
317 }
318
319 /// Returns the time taken to receive the complete response from the subgraph.
320 pub fn response_time(&self) -> Duration {
321 Duration::from_nanos(self.0.response_time_ns)
322 }
323
324 /// Returns the HTTP status code of the subgraph response.
325 pub fn status(&self) -> http::StatusCode {
326 http::StatusCode::from_u16(self.0.status_code).expect("this comes from reqwest")
327 }
328}
329
330/// Represents the cache status of a subgraph request.
331pub enum CacheStatus {
332 /// The entire response was served from cache.
333 Hit,
334 /// Part of the response was served from cache, but some data required fetching.
335 PartialHit,
336 /// No cached data was available; the entire response was fetched from the subgraph.
337 Miss,
338}
339
340impl From<wit::CacheStatus> for CacheStatus {
341 fn from(value: wit::CacheStatus) -> Self {
342 match value {
343 wit::CacheStatus::Hit => Self::Hit,
344 wit::CacheStatus::PartialHit => Self::PartialHit,
345 wit::CacheStatus::Miss => Self::Miss,
346 }
347 }
348}
349
350/// Represents an HTTP request that was executed.
351///
352/// This struct contains information about non-GraphQL HTTP requests made by the system.
353pub struct ExecutedHttpRequest(wit::ExecutedHttpRequest);
354
355impl ExecutedHttpRequest {
356 /// Returns the HTTP method used for the request.
357 ///
358 /// # Returns
359 ///
360 /// An `http::Method` representing the HTTP method (GET, POST, etc.).
361 pub fn method(&self) -> http::Method {
362 self.0.method.into()
363 }
364
365 /// Returns the URL of the HTTP request.
366 ///
367 /// # Returns
368 ///
369 /// The full URL as a string slice.
370 pub fn url(&self) -> &str {
371 &self.0.url
372 }
373
374 /// Returns the HTTP status code of the response.
375 ///
376 /// # Returns
377 ///
378 /// An `http::StatusCode` representing the response status.
379 pub fn response_status(&self) -> http::StatusCode {
380 http::StatusCode::from_u16(self.0.status_code).expect("this comes from engine")
381 }
382}
383
384impl From<wit::ExecutedHttpRequest> for ExecutedHttpRequest {
385 fn from(value: wit::ExecutedHttpRequest) -> Self {
386 Self(value)
387 }
388}
389
390/// Represents a custom extension log entry with serialized data.
391///
392/// Extension logs allow custom data to be included in the event queue stream.
393/// The data is serialized using CBOR format and can be deserialized into
394/// the appropriate type.
395pub struct ExtensionEvent(wit::ExtensionEvent);
396
397impl ExtensionEvent {
398 /// Event name
399 pub fn event_name(&self) -> &str {
400 &self.0.event_name
401 }
402
403 /// Extension name which produced this event
404 pub fn extension_name(&self) -> &str {
405 &self.0.extension_name
406 }
407
408 /// Deserializes the extension log data into the specified type.
409 ///
410 /// # Type Parameters
411 ///
412 /// * `T` - The type to deserialize into. Must implement `serde::Deserialize`.
413 ///
414 /// # Returns
415 ///
416 /// Returns `Ok(T)` with the deserialized data on success, or an `SdkError` if
417 /// deserialization fails.
418 ///
419 /// # Example
420 ///
421 /// ```ignore
422 /// use serde::Deserialize;
423 /// use grafbase_sdk::host_io::event_queue::Event;
424 ///
425 /// #[derive(Deserialize)]
426 /// struct CustomLog {
427 /// user_id: String,
428 /// action: String,
429 /// }
430 ///
431 /// // Assuming we have an ExtensionLogEntry
432 /// let log_entry: ExtensionEvent = // ... obtained from elsewhere
433 /// # todo!();
434 ///
435 /// match log_entry.deserialize::<CustomLog>() {
436 /// Ok(custom_log) => {
437 /// println!("User {} performed: {}", custom_log.user_id, custom_log.action);
438 /// }
439 /// Err(e) => {
440 /// eprintln!("Failed to deserialize log: {:?}", e);
441 /// }
442 /// }
443 /// ```
444 pub fn deserialize<'de, T>(&'de self) -> Result<T, SdkError>
445 where
446 T: serde::Deserialize<'de>,
447 {
448 let data = minicbor_serde::from_slice(&self.0.data)?;
449
450 Ok(data)
451 }
452}