1use async_trait::async_trait;
7use jsonrpsee::{
8 PendingSubscriptionSink, SubscriptionSink,
9 core::{RpcResult, SubscriptionResult},
10 types::ErrorCode,
11};
12use kona_engine::{EngineQueries, EngineQuerySender};
13
14use crate::DevEngineApiServer;
15use jsonrpsee::core::to_json_raw_value;
16
17#[derive(Debug)]
19pub struct DevEngineRpc {
20 engine_query_sender: EngineQuerySender,
22}
23
24impl DevEngineRpc {
25 pub const fn new(engine_query_sender: EngineQuerySender) -> Self {
27 Self { engine_query_sender }
28 }
29
30 async fn engine_queue_length_watcher(
32 &self,
33 ) -> Result<tokio::sync::watch::Receiver<usize>, jsonrpsee::core::SubscriptionError> {
34 let (query_tx, query_rx) = tokio::sync::oneshot::channel();
35
36 if let Err(e) =
37 self.engine_query_sender.send(EngineQueries::QueueLengthReceiver(query_tx)).await
38 {
39 tracing::warn!(target: "rpc::dev", ?e, "Failed to send engine state receiver query. The engine query handler is likely closed.");
40 return Err(jsonrpsee::core::SubscriptionError::from(
41 "Internal error. Failed to send engine state receiver query. The engine query handler is likely closed.",
42 ));
43 }
44
45 query_rx.await.map_err(|_| jsonrpsee::core::SubscriptionError::from("Internal error. Failed to receive engine task receiver query. The engine query handler is likely closed."))
46 }
47
48 async fn send_queue_length_update(
49 sink: &SubscriptionSink,
50 queue_length: &usize,
51 ) -> Result<(), jsonrpsee::core::SubscriptionError> {
52 sink.send(to_json_raw_value(queue_length).map_err(|_| {
53 jsonrpsee::core::SubscriptionError::from(
54 "Internal error. Impossible to convert engine queue length to json",
55 )
56 })?)
57 .await
58 .map_err(|_| {
59 jsonrpsee::core::SubscriptionError::from(
60 "Failed to send engine queue length update. Subscription likely dropped.",
61 )
62 })
63 }
64}
65
66#[async_trait]
67impl DevEngineApiServer for DevEngineRpc {
68 async fn dev_subscribe_engine_queue_length(
69 &self,
70 sink: PendingSubscriptionSink,
71 ) -> SubscriptionResult {
72 let sink = sink.accept().await?;
73
74 let mut subscription = self.engine_queue_length_watcher().await?;
75
76 let mut current_queue_length = *subscription.borrow();
77
78 Self::send_queue_length_update(&sink, ¤t_queue_length).await?;
79
80 while let Ok(new_queue_length) = subscription
81 .wait_for(|queue_length| queue_length != ¤t_queue_length)
82 .await
83 .map(|state| *state)
84 {
85 Self::send_queue_length_update(&sink, &new_queue_length).await?;
86 current_queue_length = new_queue_length;
87 }
88
89 tracing::warn!(target: "rpc::dev::engine_queue_size", "Subscription to engine queue size has been closed.");
90 Ok(())
91 }
92
93 async fn dev_task_queue_length(&self) -> RpcResult<usize> {
94 let (query_tx, query_rx) = tokio::sync::oneshot::channel();
95
96 self.engine_query_sender.send(EngineQueries::TaskQueueLength(query_tx)).await.map_err(
97 |_| {
98 jsonrpsee::types::ErrorObjectOwned::owned(
99 ErrorCode::InternalError.code(),
100 "Engine query channel closed",
101 None::<()>,
102 )
103 },
104 )?;
105
106 query_rx.await.map_err(|_| {
107 jsonrpsee::types::ErrorObjectOwned::owned(
108 ErrorCode::InternalError.code(),
109 "Failed to receive task queue length",
110 None::<()>,
111 )
112 })
113 }
114}