grafbase_sdk/host_io/
event_queue.rs

1//! Event queuing functionality for tracking and recording request events.
2//!
3//! This module provides comprehensive event queuing capabilities for tracking various
4//! types of operations and requests within the Grafbase Gateway system. It supports tracking
5//! of GraphQL operations, subgraph requests, HTTP requests, and custom extension logs.
6//!
7//! # Overview
8//!
9//! The event queue system is designed to capture detailed information about:
10//! - GraphQL operation execution (including timing, caching, and status)
11//! - Subgraph request details (including retries, caching, and response times)
12//! - HTTP request execution
13//! - Custom extension logs with serializable data
14//!
15//! # Example
16//!
17//! ```no_run
18//! use grafbase_sdk::host_io::event_queue;
19//! use serde::Serialize;
20//!
21//! #[derive(Serialize)]
22//! struct CustomEvent {
23//!     user_id: String,
24//!     action: String,
25//!     timestamp: u64,
26//! }
27//!
28//! // Send a custom event
29//! let log = CustomEvent {
30//!     user_id: "user123".to_string(),
31//!     action: "query_execution".to_string(),
32//!     timestamp: 1234567890,
33//! };
34//!
35//! event_queue::send("custom_event", log).expect("Failed to send event");
36//! ```
37//!
38//! # Log Aggregation
39//!
40//! By itself, event queue calls do nothing in the Grafbase Gateway. You must implement
41//! an [`Hosts`] type of an extension with event filtering, which will be called after
42//! a response is sent back to the user.
43
44use std::time::Duration;
45
46use crate::{SdkError, types::Headers, wit};
47
48/// Sends an event queue entry to the system.
49///
50/// This function serializes the provided log data and sends it to the event queue
51/// system. The log data can be any type that implements `serde::Serialize`.
52///
53/// # Arguments
54///
55/// * `name` - The name of the event to be logged. Used in event filtering.
56/// * `data` - The log data to be sent. Must implement `serde::Serialize`.
57///
58/// # Returns
59///
60/// Returns `Ok(())` on success, or an `SdkError` if serialization fails.
61///
62/// # Example
63///
64/// ```no_run
65/// use serde::Serialize;
66/// use grafbase_sdk::host_io::event_queue;
67///
68/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
69/// #[derive(Serialize)]
70/// struct UserAction<'a> {
71///     action: &'a str,
72///     user_id: &'a str,
73/// }
74///
75/// let action = UserAction {
76///     action: "login",
77///     user_id: "user123"
78/// };
79///
80/// event_queue::send("user_action", action)?;
81/// # Ok(())
82/// # }
83/// ```
84pub fn send<T>(name: &str, data: T) -> Result<(), SdkError>
85where
86    T: serde::Serialize,
87{
88    if !crate::component::can_skip_sending_events() {
89        let data = minicbor_serde::to_vec(data)?;
90        crate::component::queue_event(name, &data);
91    }
92
93    Ok(())
94}
95
96/// A queue of event queue per request from the engine.
97///
98/// This struct provides access to event queue that have been generated during
99/// request processing. Logs can be retrieved and processed using the `pop` method.
100pub struct EventQueue(wit::EventQueue);
101
102impl From<wit::EventQueue> for EventQueue {
103    fn from(value: wit::EventQueue) -> Self {
104        Self(value)
105    }
106}
107
108impl EventQueue {
109    /// Retrieves and removes the next log entry from the queue.
110    pub fn pop(&self) -> Option<Event> {
111        self.0.pop().map(Into::into)
112    }
113}
114
115/// Represents different types of event queue entries.
116///
117/// This enum categorizes the various types of operations and requests that can be
118/// logged in the event queue system.
119#[non_exhaustive]
120pub enum Event {
121    /// A GraphQL operation that was executed.
122    Operation(ExecutedOperation),
123    /// A request made to a subgraph.
124    Subgraph(ExecutedSubgraphRequest),
125    /// An HTTP request that was executed.
126    Http(ExecutedHttpRequest),
127    /// A custom extension log entry with serialized data.
128    Extension(ExtensionEvent),
129}
130
131impl From<wit::Event> for Event {
132    fn from(value: wit::Event) -> Self {
133        match value {
134            wit::Event::Operation(executed_operation) => Self::Operation(executed_operation.into()),
135            wit::Event::Subgraph(executed_subgraph_request) => Self::Subgraph(executed_subgraph_request.into()),
136            wit::Event::Http(executed_http_request) => Self::Http(executed_http_request.into()),
137            wit::Event::Extension(event) => Self::Extension(event.into()),
138        }
139    }
140}
141
142/// Represents an executed GraphQL operation with detailed metrics.
143#[non_exhaustive]
144pub struct ExecutedOperation {
145    /// The name of the GraphQL operation, if available.
146    pub name: Option<String>,
147    /// The GraphQL document (query/mutation/subscription) that was executed.
148    /// The operation is in normalized form, with all possible user data removed.
149    pub document: String,
150    /// The duration spent preparing the operation for execution.
151    /// This includes parsing, validation, and query planning time.
152    pub prepare_duration: Duration,
153    /// The total duration of the operation execution.
154    /// This includes the actual execution time and the preparation.
155    pub duration: Duration,
156    /// Indicates whether a cached execution plan was used for this operation.
157    pub cached_plan: bool,
158    /// The status of the GraphQL response.
159    pub status: GraphqlResponseStatus,
160    /// The type of GraphQL operation that was executed.
161    pub operation_type: OperationType,
162    /// The complexity represents the computational cost of executing the operation.
163    /// Read more: <https://grafbase.com/docs/gateway/configuration/complexity-control>
164    pub complexity: Option<u64>,
165    /// Indicates whether the operation used any deprecated fields.
166    pub has_deprecated_fields: bool,
167}
168
169impl From<wit::ExecutedOperation> for ExecutedOperation {
170    fn from(value: wit::ExecutedOperation) -> Self {
171        ExecutedOperation {
172            name: value.name,
173            document: value.document,
174            prepare_duration: Duration::from_nanos(value.prepare_duration_ns),
175            duration: Duration::from_nanos(value.duration_ns),
176            cached_plan: value.cached_plan,
177            status: value.status.into(),
178            operation_type: value.operation_type.into(),
179            complexity: value.complexity,
180            has_deprecated_fields: value.has_deprecated_fields,
181        }
182    }
183}
184
185/// Represents the type of GraphQL operation.
186///
187/// This enum categorizes the different types of GraphQL operations
188/// that can be executed.
189#[derive(Debug, Clone, Copy, PartialEq, Eq)]
190pub enum OperationType {
191    /// A GraphQL query operation for reading data.
192    Query,
193    /// A GraphQL mutation operation for modifying data.
194    Mutation,
195    /// A GraphQL subscription operation for real-time data streaming.
196    Subscription,
197}
198
199impl From<wit::OperationType> for OperationType {
200    fn from(value: wit::OperationType) -> Self {
201        match value {
202            wit::OperationType::Query => OperationType::Query,
203            wit::OperationType::Mutation => OperationType::Mutation,
204            wit::OperationType::Subscription => OperationType::Subscription,
205        }
206    }
207}
208
209/// Represents the status of a GraphQL response.
210///
211/// This enum categorizes the different outcomes of a GraphQL operation execution.
212#[derive(serde::Serialize, Debug, Clone)]
213pub enum GraphqlResponseStatus {
214    /// The operation completed successfully without errors.
215    Success,
216    /// The operation completed but encountered field-level errors.
217    FieldError(FieldError),
218    /// The operation failed due to request-level errors.
219    RequestError(RequestError),
220    /// The request was refused before execution (e.g., due to authentication or rate limiting).
221    RefusedRequest,
222}
223
224/// Contains information about field-level errors in a GraphQL response.
225#[derive(serde::Serialize, Debug, Clone)]
226#[non_exhaustive]
227pub struct FieldError {
228    /// The number of field errors encountered.
229    pub count: u64,
230    /// Indicates whether the data field in the response is null.
231    pub data_is_null: bool,
232}
233
234/// Contains information about request-level errors in a GraphQL response.
235#[derive(serde::Serialize, Debug, Clone)]
236#[non_exhaustive]
237pub struct RequestError {
238    /// The number of request errors encountered.
239    pub count: u64,
240}
241
242impl From<wit::GraphqlResponseStatus> for GraphqlResponseStatus {
243    fn from(value: wit::GraphqlResponseStatus) -> Self {
244        match value {
245            wit::GraphqlResponseStatus::Success => GraphqlResponseStatus::Success,
246            wit::GraphqlResponseStatus::FieldError(wit::FieldError { count, data_is_null }) => {
247                GraphqlResponseStatus::FieldError(FieldError { count, data_is_null })
248            }
249            wit::GraphqlResponseStatus::RequestError(wit::RequestError { count }) => {
250                GraphqlResponseStatus::RequestError(RequestError { count })
251            }
252            wit::GraphqlResponseStatus::RefusedRequest => GraphqlResponseStatus::RefusedRequest,
253        }
254    }
255}
256
257/// Represents a request made to a subgraph with detailed execution information.
258///
259/// This struct contains comprehensive information about a subgraph request,
260/// including retry attempts, caching status, and timing metrics.
261#[non_exhaustive]
262pub struct ExecutedSubgraphRequest {
263    /// The name of the subgraph that was queried.
264    pub subgraph_name: String,
265    /// The HTTP method used for the subgraph request (e.g., GET, POST).
266    pub method: http::Method,
267    /// The URL of the subgraph endpoint that was queried.
268    pub url: String,
269    /// The cache status of the subgraph request.
270    pub cache_status: CacheStatus,
271    /// The total duration of all execution attempts for this subgraph request.
272    pub total_duration: Duration,
273    /// Indicates whether any errors were encountered during the subgraph request.
274    pub has_errors: bool,
275    executions: Vec<wit::SubgraphRequestExecutionKind>,
276}
277
278impl ExecutedSubgraphRequest {
279    /// Returns an iterator over all execution attempts for this subgraph request.
280    ///
281    /// This includes both successful responses and various types of failures
282    /// (e.g., rate limiting, server errors).
283    pub fn into_executions(self) -> impl Iterator<Item = RequestExecution> {
284        self.executions.into_iter().map(RequestExecution::from)
285    }
286}
287
288impl From<wit::ExecutedSubgraphRequest> for ExecutedSubgraphRequest {
289    fn from(value: wit::ExecutedSubgraphRequest) -> Self {
290        Self {
291            subgraph_name: value.subgraph_name,
292            method: value.method.into(),
293            url: value.url,
294            cache_status: value.cache_status.into(),
295            total_duration: Duration::from_nanos(value.total_duration_ns),
296            has_errors: value.has_errors,
297            executions: value.executions,
298        }
299    }
300}
301
302/// Represents a single execution attempt of a subgraph request.
303///
304/// This enum captures the different outcomes of attempting to execute a request
305/// to a subgraph endpoint.
306#[non_exhaustive]
307pub enum RequestExecution {
308    /// The subgraph returned a 5xx server error.
309    InternalServerError,
310    /// The request failed due to network or other request-level errors.
311    RequestError,
312    /// The request was rate limited by the engine rate limiter.
313    RateLimited,
314    /// The subgraph returned a response (which may still contain GraphQL errors).
315    Response(SubgraphResponse),
316}
317
318impl From<wit::SubgraphRequestExecutionKind> for RequestExecution {
319    fn from(value: wit::SubgraphRequestExecutionKind) -> Self {
320        match value {
321            wit::SubgraphRequestExecutionKind::InternalServerError => Self::InternalServerError,
322            wit::SubgraphRequestExecutionKind::RequestError => Self::RequestError,
323            wit::SubgraphRequestExecutionKind::RateLimited => Self::RateLimited,
324            wit::SubgraphRequestExecutionKind::Response(subgraph_response) => Self::Response(subgraph_response.into()),
325        }
326    }
327}
328
329/// Contains timing and status information for a successful subgraph response.
330#[non_exhaustive]
331pub struct SubgraphResponse {
332    /// The time taken to establish a connection to the subgraph.
333    pub connection_time: Duration,
334    /// The time taken to receive the complete response from the subgraph.
335    pub response_time: Duration,
336    /// The HTTP status code of the subgraph response.
337    pub status_code: http::StatusCode,
338    /// The HTTP response headers from the subgraph.
339    pub response_headers: Headers,
340}
341
342impl From<wit::SubgraphResponse> for SubgraphResponse {
343    fn from(value: wit::SubgraphResponse) -> Self {
344        Self {
345            connection_time: Duration::from_nanos(value.connection_time_ns),
346            response_time: Duration::from_nanos(value.response_time_ns),
347            status_code: http::StatusCode::from_u16(value.status_code).expect("Gateway provides a valid status code"),
348            response_headers: value.response_headers.into(),
349        }
350    }
351}
352
353/// Represents the cache status of a subgraph request.
354#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
355pub enum CacheStatus {
356    /// The entire response was served from cache.
357    Hit,
358    /// Part of the response was served from cache, but some data required fetching.
359    PartialHit,
360    /// No cached data was available; the entire response was fetched from the subgraph.
361    Miss,
362}
363
364impl From<wit::CacheStatus> for CacheStatus {
365    fn from(value: wit::CacheStatus) -> Self {
366        match value {
367            wit::CacheStatus::Hit => Self::Hit,
368            wit::CacheStatus::PartialHit => Self::PartialHit,
369            wit::CacheStatus::Miss => Self::Miss,
370        }
371    }
372}
373
374impl CacheStatus {
375    /// Returns the cache status as a string slice.
376    pub fn as_str(&self) -> &'static str {
377        match self {
378            Self::Hit => "hit",
379            Self::PartialHit => "partial_hit",
380            Self::Miss => "miss",
381        }
382    }
383}
384
385impl AsRef<str> for CacheStatus {
386    fn as_ref(&self) -> &str {
387        self.as_str()
388    }
389}
390
391/// Represents an HTTP request that was executed.
392///
393/// This struct contains information about non-GraphQL HTTP requests made by the system.
394#[non_exhaustive]
395pub struct ExecutedHttpRequest {
396    /// An `http::StatusCode` representing the response status.
397    pub status_code: http::StatusCode,
398    /// The HTTP method used for the request.
399    pub method: http::Method,
400    /// The full URL as a string slice.
401    pub url: String,
402}
403
404impl From<wit::ExecutedHttpRequest> for ExecutedHttpRequest {
405    fn from(value: wit::ExecutedHttpRequest) -> Self {
406        Self {
407            status_code: http::StatusCode::from_u16(value.status_code).expect("Gateway provides a valid status code"),
408            method: value.method.into(),
409            url: value.url,
410        }
411    }
412}
413
414/// Represents a custom extension log entry with serialized data.
415///
416/// Extension logs allow custom data to be included in the event queue stream.
417/// The data is serialized using CBOR format and can be deserialized into
418/// the appropriate type.
419#[non_exhaustive]
420pub struct ExtensionEvent {
421    /// Event name
422    pub event_name: String,
423    /// Extension name which produced this event
424    pub extension_name: String,
425    data: Vec<u8>,
426}
427
428impl ExtensionEvent {
429    /// Deserializes the extension log data into the specified type.
430    ///
431    /// # Type Parameters
432    ///
433    /// * `T` - The type to deserialize into. Must implement `serde::Deserialize`.
434    ///
435    /// # Returns
436    ///
437    /// Returns `Ok(T)` with the deserialized data on success, or an `SdkError` if
438    /// deserialization fails.
439    ///
440    /// # Example
441    ///
442    /// ```ignore
443    /// use serde::Deserialize;
444    /// use grafbase_sdk::host_io::event_queue::Event;
445    ///
446    /// #[derive(Deserialize)]
447    /// struct CustomLog {
448    ///     user_id: String,
449    ///     action: String,
450    /// }
451    ///
452    /// // Assuming we have an ExtensionLogEntry
453    /// let log_entry: ExtensionEvent = // ... obtained from elsewhere
454    /// # todo!();
455    ///
456    /// match log_entry.deserialize::<CustomLog>() {
457    ///     Ok(custom_log) => {
458    ///         println!("User {} performed: {}", custom_log.user_id, custom_log.action);
459    ///     }
460    ///     Err(e) => {
461    ///         eprintln!("Failed to deserialize log: {:?}", e);
462    ///     }
463    /// }
464    /// ```
465    pub fn deserialize<'de, T>(&'de self) -> Result<T, SdkError>
466    where
467        T: serde::Deserialize<'de>,
468    {
469        let data = minicbor_serde::from_slice(&self.data)?;
470
471        Ok(data)
472    }
473}
474
475impl From<wit::ExtensionEvent> for ExtensionEvent {
476    fn from(value: wit::ExtensionEvent) -> Self {
477        Self {
478            event_name: value.event_name,
479            extension_name: value.extension_name,
480            data: value.data,
481        }
482    }
483}