grafbase_sdk/host_io/
event_queue.rs

1//! Event queuing functionality for tracking and recording request events.
2//!
3//! This module provides comprehensive event queuing capabilities for tracking various
4//! types of operations and requests within the Grafbase Gateway system. It supports tracking
5//! of GraphQL operations, subgraph requests, HTTP requests, and custom extension logs.
6//!
7//! # Overview
8//!
9//! The event queue system is designed to capture detailed information about:
10//! - GraphQL operation execution (including timing, caching, and status)
11//! - Subgraph request details (including retries, caching, and response times)
12//! - HTTP request execution
13//! - Custom extension logs with serializable data
14//!
15//! # Example
16//!
17//! ```no_run
18//! use grafbase_sdk::host_io::event_queue;
19//! use serde::Serialize;
20//!
21//! #[derive(Serialize)]
22//! struct CustomEvent {
23//!     user_id: String,
24//!     action: String,
25//!     timestamp: u64,
26//! }
27//!
28//! // Send a custom event
29//! let log = CustomEvent {
30//!     user_id: "user123".to_string(),
31//!     action: "query_execution".to_string(),
32//!     timestamp: 1234567890,
33//! };
34//!
35//! event_queue::send("custom_event", log).expect("Failed to send event");
36//! ```
37//!
38//! # Log Aggregation
39//!
40//! By itself, event queue calls do nothing in the Grafbase Gateway. You must implement
41//! an [`Hosts`] type of an extension with event filtering, which will be called after
42//! a response is sent back to the user.
43
44use std::time::Duration;
45
46use crate::{SdkError, types::SharedHttpHeaders, wit};
47
48/// Sends an event queue entry to the system.
49///
50/// This function serializes the provided log data and sends it to the event queue
51/// system. The log data can be any type that implements `serde::Serialize`.
52///
53/// # Arguments
54///
55/// * `name` - The name of the event to be logged. Used in event filtering.
56/// * `data` - The log data to be sent. Must implement `serde::Serialize`.
57///
58/// # Returns
59///
60/// Returns `Ok(())` on success, or an `SdkError` if serialization fails.
61///
62/// # Example
63///
64/// ```no_run
65/// use serde::Serialize;
66/// use grafbase_sdk::host_io::event_queue;
67///
68/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
69/// #[derive(Serialize)]
70/// struct UserAction<'a> {
71///     action: &'a str,
72///     user_id: &'a str,
73/// }
74///
75/// let action = UserAction {
76///     action: "login",
77///     user_id: "user123"
78/// };
79///
80/// event_queue::send("user_action", action)?;
81/// # Ok(())
82/// # }
83/// ```
84pub fn send<T>(name: &str, data: T) -> Result<(), SdkError>
85where
86    T: serde::Serialize,
87{
88    if !crate::component::can_skip_sending_events() {
89        let data = minicbor_serde::to_vec(data)?;
90        crate::component::queue_event(name, &data);
91    }
92
93    Ok(())
94}
95
96/// A queue of event queue per request from the engine.
97///
98/// This struct provides access to event queue that have been generated during
99/// request processing. Logs can be retrieved and processed using the `pop` method.
100pub struct EventQueue(wit::EventQueue);
101
102impl From<wit::EventQueue> for EventQueue {
103    fn from(value: wit::EventQueue) -> Self {
104        Self(value)
105    }
106}
107
108impl EventQueue {
109    /// Retrieves and removes the next log entry from the queue.
110    pub fn pop(&self) -> Option<Event> {
111        self.0.pop().map(Into::into)
112    }
113}
114
115/// Represents different types of event queue entries.
116///
117/// This enum categorizes the various types of operations and requests that can be
118/// logged in the event queue system.
119pub enum Event {
120    /// A GraphQL operation that was executed.
121    Operation(ExecutedOperation),
122    /// A request made to a subgraph.
123    Subgraph(ExecutedSubgraphRequest),
124    /// An HTTP request that was executed.
125    Http(ExecutedHttpRequest),
126    /// A custom extension log entry with serialized data.
127    Extension(ExtensionEvent),
128}
129
130impl From<wit::Event> for Event {
131    fn from(value: wit::Event) -> Self {
132        match value {
133            wit::Event::Operation(executed_operation) => Self::Operation(executed_operation.into()),
134            wit::Event::Subgraph(executed_subgraph_request) => Self::Subgraph(executed_subgraph_request.into()),
135            wit::Event::Http(executed_http_request) => Self::Http(executed_http_request.into()),
136            wit::Event::Extension(items) => Self::Extension(ExtensionEvent(items)),
137        }
138    }
139}
140
141/// Represents an executed GraphQL operation with detailed metrics.
142pub struct ExecutedOperation(wit::ExecutedOperation);
143
144impl From<wit::ExecutedOperation> for ExecutedOperation {
145    fn from(value: wit::ExecutedOperation) -> Self {
146        Self(value)
147    }
148}
149
150impl ExecutedOperation {
151    /// Returns the name of the GraphQL operation, if available.
152    pub fn name(&self) -> Option<&str> {
153        self.0.name.as_deref()
154    }
155
156    /// Returns the GraphQL document (query/mutation/subscription) that was executed.
157    /// The operation is in normalized form, with all possible user data removed.
158    pub fn document(&self) -> &str {
159        &self.0.document
160    }
161
162    /// Returns the duration spent preparing the operation for execution.
163    ///
164    /// This includes parsing, validation, and query planning time.
165    pub fn prepare_duration(&self) -> Duration {
166        Duration::from_nanos(self.0.prepare_duration_ns)
167    }
168
169    /// Returns the total duration of the operation execution.
170    ///
171    /// This includes the actual execution time after preparation.
172    pub fn duration(&self) -> Duration {
173        Duration::from_nanos(self.0.duration_ns)
174    }
175
176    /// Indicates whether a cached execution plan was used for this operation.
177    pub fn cached_plan(&self) -> bool {
178        self.0.cached_plan
179    }
180
181    /// Returns the status of the GraphQL response.
182    pub fn status(&self) -> GraphqlResponseStatus {
183        self.0.status.into()
184    }
185
186    /// Returns the type of GraphQL operation that was executed.
187    pub fn operation_type(&self) -> OperationType {
188        self.0.operation_type.into()
189    }
190
191    /// The complexity represents the computational cost of executing the operation.
192    ///
193    /// Read more: <https://grafbase.com/docs/gateway/configuration/complexity-control>
194    pub fn complexity(&self) -> Option<u64> {
195        self.0.complexity
196    }
197
198    /// Indicates whether the operation used any deprecated fields.
199    ///
200    /// This returns `true` if the GraphQL operation accessed any fields
201    /// that have been marked as deprecated in the schema.
202    pub fn has_deprecated_fields(&self) -> bool {
203        self.0.has_deprecated_fields
204    }
205}
206
207/// Represents the type of GraphQL operation.
208///
209/// This enum categorizes the different types of GraphQL operations
210/// that can be executed.
211pub enum OperationType {
212    /// A GraphQL query operation for reading data.
213    Query,
214    /// A GraphQL mutation operation for modifying data.
215    Mutation,
216    /// A GraphQL subscription operation for real-time data streaming.
217    Subscription,
218}
219
220impl From<wit::OperationType> for OperationType {
221    fn from(value: wit::OperationType) -> Self {
222        match value {
223            wit::OperationType::Query => OperationType::Query,
224            wit::OperationType::Mutation => OperationType::Mutation,
225            wit::OperationType::Subscription => OperationType::Subscription,
226        }
227    }
228}
229
230/// Represents the status of a GraphQL response.
231///
232/// This enum categorizes the different outcomes of a GraphQL operation execution.
233#[derive(serde::Serialize, Debug)]
234pub enum GraphqlResponseStatus {
235    /// The operation completed successfully without errors.
236    Success,
237    /// The operation completed but encountered field-level errors.
238    FieldError(FieldError),
239    /// The operation failed due to request-level errors.
240    RequestError(RequestError),
241    /// The request was refused before execution (e.g., due to authentication or rate limiting).
242    RefusedRequest,
243}
244
245/// Contains information about field-level errors in a GraphQL response.
246#[derive(serde::Serialize, Debug)]
247pub struct FieldError {
248    /// The number of field errors encountered.
249    pub count: u64,
250    /// Indicates whether the data field in the response is null.
251    pub data_is_null: bool,
252}
253
254/// Contains information about request-level errors in a GraphQL response.
255#[derive(serde::Serialize, Debug)]
256pub struct RequestError {
257    /// The number of request errors encountered.
258    pub count: u64,
259}
260
261impl From<wit::GraphqlResponseStatus> for GraphqlResponseStatus {
262    fn from(value: wit::GraphqlResponseStatus) -> Self {
263        match value {
264            wit::GraphqlResponseStatus::Success => GraphqlResponseStatus::Success,
265            wit::GraphqlResponseStatus::FieldError(wit::FieldError { count, data_is_null }) => {
266                GraphqlResponseStatus::FieldError(FieldError { count, data_is_null })
267            }
268            wit::GraphqlResponseStatus::RequestError(wit::RequestError { count }) => {
269                GraphqlResponseStatus::RequestError(RequestError { count })
270            }
271            wit::GraphqlResponseStatus::RefusedRequest => GraphqlResponseStatus::RefusedRequest,
272        }
273    }
274}
275
276/// Represents a request made to a subgraph with detailed execution information.
277///
278/// This struct contains comprehensive information about a subgraph request,
279/// including retry attempts, caching status, and timing metrics.
280pub struct ExecutedSubgraphRequest(wit::ExecutedSubgraphRequest);
281
282impl ExecutedSubgraphRequest {
283    /// Returns the name of the subgraph that was queried.
284    pub fn subgraph_name(&self) -> &str {
285        &self.0.subgraph_name
286    }
287
288    /// Returns the HTTP method used for the subgraph request.
289    pub fn method(&self) -> http::Method {
290        self.0.method.into()
291    }
292
293    /// Returns the URL of the subgraph endpoint.
294    pub fn url(&self) -> &str {
295        &self.0.url
296    }
297
298    /// Returns an iterator over all execution attempts for this subgraph request.
299    ///
300    /// This includes both successful responses and various types of failures
301    /// (e.g., rate limiting, server errors).
302    pub fn executions(&self) -> impl Iterator<Item = RequestExecution<'_>> {
303        self.0.executions.iter().map(RequestExecution::from)
304    }
305
306    /// Returns the cache status for this subgraph request.
307    pub fn cache_status(&self) -> CacheStatus {
308        self.0.cache_status.into()
309    }
310
311    /// Returns the total duration of all execution attempts.
312    pub fn total_duration(&self) -> Duration {
313        Duration::from_nanos(self.0.total_duration_ns)
314    }
315
316    /// Indicates whether any errors were encountered during the subgraph request.
317    pub fn has_errors(&self) -> bool {
318        self.0.has_errors
319    }
320}
321
322impl From<wit::ExecutedSubgraphRequest> for ExecutedSubgraphRequest {
323    fn from(value: wit::ExecutedSubgraphRequest) -> Self {
324        Self(value)
325    }
326}
327
328/// Represents a single execution attempt of a subgraph request.
329///
330/// This enum captures the different outcomes of attempting to execute a request
331/// to a subgraph endpoint.
332pub enum RequestExecution<'a> {
333    /// The subgraph returned a 5xx server error.
334    InternalServerError,
335    /// The request failed due to network or other request-level errors.
336    RequestError,
337    /// The request was rate limited by the engine rate limiter.
338    RateLimited,
339    /// The subgraph returned a response (which may still contain GraphQL errors).
340    Response(SubgraphResponse<'a>),
341}
342
343impl<'a> From<&'a wit::SubgraphRequestExecutionKind> for RequestExecution<'a> {
344    fn from(value: &'a wit::SubgraphRequestExecutionKind) -> Self {
345        match value {
346            wit::SubgraphRequestExecutionKind::InternalServerError => Self::InternalServerError,
347            wit::SubgraphRequestExecutionKind::RequestError => Self::RequestError,
348            wit::SubgraphRequestExecutionKind::RateLimited => Self::RateLimited,
349            wit::SubgraphRequestExecutionKind::Response(subgraph_response) => {
350                Self::Response(SubgraphResponse(subgraph_response))
351            }
352        }
353    }
354}
355
356/// Contains timing and status information for a successful subgraph response.
357pub struct SubgraphResponse<'a>(&'a wit::SubgraphResponse);
358
359impl<'a> SubgraphResponse<'a> {
360    /// Returns the time taken to establish a connection to the subgraph.
361    pub fn connection_time(&self) -> Duration {
362        Duration::from_nanos(self.0.connection_time_ns)
363    }
364
365    /// Returns the time taken to receive the complete response from the subgraph.
366    pub fn response_time(&self) -> Duration {
367        Duration::from_nanos(self.0.response_time_ns)
368    }
369
370    /// Returns the HTTP status code of the subgraph response.
371    pub fn status(&self) -> http::StatusCode {
372        http::StatusCode::from_u16(self.0.status_code).expect("this comes from reqwest")
373    }
374
375    /// Returns the HTTP response headers from the subgraph.
376    pub fn response_headers(&self) -> SharedHttpHeaders<'a> {
377        SharedHttpHeaders::from(&self.0.response_headers)
378    }
379}
380
381/// Represents the cache status of a subgraph request.
382#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
383pub enum CacheStatus {
384    /// The entire response was served from cache.
385    Hit,
386    /// Part of the response was served from cache, but some data required fetching.
387    PartialHit,
388    /// No cached data was available; the entire response was fetched from the subgraph.
389    Miss,
390}
391
392impl From<wit::CacheStatus> for CacheStatus {
393    fn from(value: wit::CacheStatus) -> Self {
394        match value {
395            wit::CacheStatus::Hit => Self::Hit,
396            wit::CacheStatus::PartialHit => Self::PartialHit,
397            wit::CacheStatus::Miss => Self::Miss,
398        }
399    }
400}
401
402impl CacheStatus {
403    /// Returns the cache status as a string slice.
404    pub fn as_str(&self) -> &'static str {
405        match self {
406            Self::Hit => "hit",
407            Self::PartialHit => "partial_hit",
408            Self::Miss => "miss",
409        }
410    }
411}
412
413impl AsRef<str> for CacheStatus {
414    fn as_ref(&self) -> &str {
415        self.as_str()
416    }
417}
418
419/// Represents an HTTP request that was executed.
420///
421/// This struct contains information about non-GraphQL HTTP requests made by the system.
422pub struct ExecutedHttpRequest(wit::ExecutedHttpRequest);
423
424impl ExecutedHttpRequest {
425    /// Returns the HTTP method used for the request.
426    ///
427    /// # Returns
428    ///
429    /// An `http::Method` representing the HTTP method (GET, POST, etc.).
430    pub fn method(&self) -> http::Method {
431        self.0.method.into()
432    }
433
434    /// Returns the URL of the HTTP request.
435    ///
436    /// # Returns
437    ///
438    /// The full URL as a string slice.
439    pub fn url(&self) -> &str {
440        &self.0.url
441    }
442
443    /// Returns the HTTP status code of the response.
444    ///
445    /// # Returns
446    ///
447    /// An `http::StatusCode` representing the response status.
448    pub fn response_status(&self) -> http::StatusCode {
449        http::StatusCode::from_u16(self.0.status_code).expect("this comes from engine")
450    }
451}
452
453impl From<wit::ExecutedHttpRequest> for ExecutedHttpRequest {
454    fn from(value: wit::ExecutedHttpRequest) -> Self {
455        Self(value)
456    }
457}
458
459/// Represents a custom extension log entry with serialized data.
460///
461/// Extension logs allow custom data to be included in the event queue stream.
462/// The data is serialized using CBOR format and can be deserialized into
463/// the appropriate type.
464pub struct ExtensionEvent(wit::ExtensionEvent);
465
466impl ExtensionEvent {
467    /// Event name
468    pub fn event_name(&self) -> &str {
469        &self.0.event_name
470    }
471
472    /// Extension name which produced this event
473    pub fn extension_name(&self) -> &str {
474        &self.0.extension_name
475    }
476
477    /// Deserializes the extension log data into the specified type.
478    ///
479    /// # Type Parameters
480    ///
481    /// * `T` - The type to deserialize into. Must implement `serde::Deserialize`.
482    ///
483    /// # Returns
484    ///
485    /// Returns `Ok(T)` with the deserialized data on success, or an `SdkError` if
486    /// deserialization fails.
487    ///
488    /// # Example
489    ///
490    /// ```ignore
491    /// use serde::Deserialize;
492    /// use grafbase_sdk::host_io::event_queue::Event;
493    ///
494    /// #[derive(Deserialize)]
495    /// struct CustomLog {
496    ///     user_id: String,
497    ///     action: String,
498    /// }
499    ///
500    /// // Assuming we have an ExtensionLogEntry
501    /// let log_entry: ExtensionEvent = // ... obtained from elsewhere
502    /// # todo!();
503    ///
504    /// match log_entry.deserialize::<CustomLog>() {
505    ///     Ok(custom_log) => {
506    ///         println!("User {} performed: {}", custom_log.user_id, custom_log.action);
507    ///     }
508    ///     Err(e) => {
509    ///         eprintln!("Failed to deserialize log: {:?}", e);
510    ///     }
511    /// }
512    /// ```
513    pub fn deserialize<'de, T>(&'de self) -> Result<T, SdkError>
514    where
515        T: serde::Deserialize<'de>,
516    {
517        let data = minicbor_serde::from_slice(&self.0.data)?;
518
519        Ok(data)
520    }
521}