kona_rpc/
dev.rs

1//! Development RPC API for exposing internal engine state and task queue information.
2//!
3//! This module provides development and debugging endpoints that allow introspection
4//! of the engine's internal state, task queue, and operations.
5
6use async_trait::async_trait;
7use jsonrpsee::{
8    PendingSubscriptionSink, SubscriptionSink,
9    core::{RpcResult, SubscriptionResult},
10    types::ErrorCode,
11};
12use kona_engine::{EngineQueries, EngineQuerySender};
13
14use crate::DevEngineApiServer;
15use jsonrpsee::core::to_json_raw_value;
16
17/// Implementation of the development RPC API.
18#[derive(Debug)]
19pub struct DevEngineRpc {
20    /// The engine query sender.
21    engine_query_sender: EngineQuerySender,
22}
23
24impl DevEngineRpc {
25    /// Creates a new [`DevEngineRpc`] instance.
26    pub const fn new(engine_query_sender: EngineQuerySender) -> Self {
27        Self { engine_query_sender }
28    }
29
30    /// Gets an engine queue length watcher for subscriptions.
31    async fn engine_queue_length_watcher(
32        &self,
33    ) -> Result<tokio::sync::watch::Receiver<usize>, jsonrpsee::core::SubscriptionError> {
34        let (query_tx, query_rx) = tokio::sync::oneshot::channel();
35
36        if let Err(e) =
37            self.engine_query_sender.send(EngineQueries::QueueLengthReceiver(query_tx)).await
38        {
39            tracing::warn!(target: "rpc::dev", ?e, "Failed to send engine state receiver query. The engine query handler is likely closed.");
40            return Err(jsonrpsee::core::SubscriptionError::from(
41                "Internal error. Failed to send engine state receiver query. The engine query handler is likely closed.",
42            ));
43        }
44
45        query_rx.await.map_err(|_| jsonrpsee::core::SubscriptionError::from("Internal error. Failed to receive engine task receiver query. The engine query handler is likely closed."))
46    }
47
48    async fn send_queue_length_update(
49        sink: &SubscriptionSink,
50        queue_length: &usize,
51    ) -> Result<(), jsonrpsee::core::SubscriptionError> {
52        sink.send(to_json_raw_value(queue_length).map_err(|_| {
53            jsonrpsee::core::SubscriptionError::from(
54                "Internal error. Impossible to convert engine queue length to json",
55            )
56        })?)
57        .await
58        .map_err(|_| {
59            jsonrpsee::core::SubscriptionError::from(
60                "Failed to send engine queue length update. Subscription likely dropped.",
61            )
62        })
63    }
64}
65
66#[async_trait]
67impl DevEngineApiServer for DevEngineRpc {
68    async fn dev_subscribe_engine_queue_length(
69        &self,
70        sink: PendingSubscriptionSink,
71    ) -> SubscriptionResult {
72        let sink = sink.accept().await?;
73
74        let mut subscription = self.engine_queue_length_watcher().await?;
75
76        let mut current_queue_length = *subscription.borrow();
77
78        Self::send_queue_length_update(&sink, &current_queue_length).await?;
79
80        while let Ok(new_queue_length) = subscription
81            .wait_for(|queue_length| queue_length != &current_queue_length)
82            .await
83            .map(|state| *state)
84        {
85            Self::send_queue_length_update(&sink, &new_queue_length).await?;
86            current_queue_length = new_queue_length;
87        }
88
89        tracing::warn!(target: "rpc::dev::engine_queue_size", "Subscription to engine queue size has been closed.");
90        Ok(())
91    }
92
93    async fn dev_task_queue_length(&self) -> RpcResult<usize> {
94        let (query_tx, query_rx) = tokio::sync::oneshot::channel();
95
96        self.engine_query_sender.send(EngineQueries::TaskQueueLength(query_tx)).await.map_err(
97            |_| {
98                jsonrpsee::types::ErrorObjectOwned::owned(
99                    ErrorCode::InternalError.code(),
100                    "Engine query channel closed",
101                    None::<()>,
102                )
103            },
104        )?;
105
106        query_rx.await.map_err(|_| {
107            jsonrpsee::types::ErrorObjectOwned::owned(
108                ErrorCode::InternalError.code(),
109                "Failed to receive task queue length",
110                None::<()>,
111            )
112        })
113    }
114}