Skip to main content

tycho_rpc/endpoint/jrpc/
mod.rs

1use std::borrow::Cow;
2use std::cell::RefCell;
3use std::marker::PhantomData;
4use std::sync::OnceLock;
5
6use axum::extract::State;
7use axum::response::{IntoResponse, Response};
8use base64::prelude::{BASE64_STANDARD, Engine as _};
9use serde::{Deserialize, Serialize};
10use serde_json::value::RawValue;
11use tycho_block_util::message::{ExtMsgRepr, validate_external_message};
12use tycho_types::models::*;
13use tycho_types::prelude::*;
14use tycho_util::metrics::HistogramGuard;
15use tycho_util::serde_helpers::{self, Base64BytesWithLimit};
16
17pub use self::cache::JrpcEndpointCache;
18pub use self::stream::{StreamContext, SubscriptionsState, stream_route, stream_router};
19use self::stream::{
20    SubscribeAction, SubscriptionEmptyRequest, SubscriptionUpdateRequest, handle_list,
21    handle_status, handle_sub, handle_unsub_all,
22};
23use crate::models::{GenTimings, LastTransactionId};
24use crate::state::{
25    CodeHashesIter, LoadedAccountState, RpcState, RpcStateError, TransactionsIterBuilder,
26};
27use crate::util::error_codes::*;
28use crate::util::jrpc_extractor::{
29    Jrpc, JrpcErrorResponse, JrpcOkResponse, RfcBehaviour, declare_jrpc_method,
30};
31
32mod cache;
33mod stream;
34
35declare_jrpc_method! {
36    pub enum MethodParams: Method {
37        GetCapabilities(EmptyParams),
38        GetLatestKeyBlock(EmptyParams),
39        GetBlockchainConfig(EmptyParams),
40        GetStatus(EmptyParams),
41        GetTimings(EmptyParams),
42        SendMessage(SendMessageRequest),
43        GetContractState(GetContractStateRequest),
44        GetLibraryCell(GetLibraryCellRequest),
45        GetAccountsByCodeHash(GetAccountsByCodeHashRequest),
46        GetTransactionsList(GetTransactionsListRequest),
47        GetTransaction(GetTransactionRequest),
48        GetDstTransaction(GetDstTransactionRequest),
49        GetTransactionBlockId(GetTransactionRequest),
50        GetKeyBlockProof(GetKeyBlockProofRequest),
51        GetBlockProof(GetBlockRequest),
52        // NOTE: Temp endpoint. Must be enforced by limits and other stuff.
53        GetBlockData(GetBlockRequest),
54        SubSubscribe(SubscriptionUpdateRequest),
55        SubUnsubscribe(SubscriptionUpdateRequest),
56        SubUnsubscribeAll(SubscriptionEmptyRequest),
57        SubStatus(SubscriptionEmptyRequest),
58        SubListSubscriptions(SubscriptionEmptyRequest),
59    }
60}
61
62pub async fn route(State(state): State<RpcState>, req: Jrpc<RfcBehaviour, Method>) -> Response {
63    let label = [("method", req.method)];
64    let _hist = HistogramGuard::begin_with_labels("tycho_jrpc_request_time", &label);
65    match req.params {
66        MethodParams::GetCapabilities(_) => ok_to_response(req.id, get_capabilities(&state)),
67        MethodParams::GetLatestKeyBlock(_) => match &*state.jrpc_cache().load_latest_key_block() {
68            Some(config) => ok_to_response(req.id, config.as_ref()),
69            None => error_to_response(req.id, RpcStateError::NotReady),
70        },
71        MethodParams::GetBlockchainConfig(_) => match &*state.jrpc_cache().load_blockchain_config()
72        {
73            Some(config) => ok_to_response(req.id, config.as_ref()),
74            None => error_to_response(req.id, RpcStateError::NotReady),
75        },
76        MethodParams::GetStatus(_) => ok_to_response(req.id, GetStatusResponse {
77            ready: state.is_ready(),
78        }),
79        MethodParams::GetTimings(_) => {
80            if state.is_ready() {
81                ok_to_response(req.id, state.load_timings().as_ref())
82            } else {
83                error_to_response(req.id, RpcStateError::NotReady)
84            }
85        }
86        MethodParams::SendMessage(p) => {
87            if let Err(e) = validate_external_message(&p.message).await {
88                return JrpcErrorResponse {
89                    id: Some(req.id),
90                    code: INVALID_BOC_CODE,
91                    message: e.to_string().into(),
92                    behaviour: PhantomData::<RfcBehaviour>,
93                }
94                .into_response();
95            }
96
97            state.broadcast_external_message(&p.message).await;
98            ok_to_response(req.id, ())
99        }
100        MethodParams::GetLibraryCell(p) => {
101            let res = match state.jrpc_cache().get_library_cell_response(&p.hash) {
102                Some(value) => value,
103                None => match state.get_raw_library(&p.hash) {
104                    Ok(res) => state.jrpc_cache().insert_library_cell_response(p.hash, res),
105                    Err(e) => return error_to_response(req.id, RpcStateError::Internal(e)),
106                },
107            };
108            ok_to_response(req.id, res.as_ref())
109        }
110        MethodParams::GetContractState(p) => {
111            let item = match state.get_account_state(&p.address) {
112                Ok(item) => item,
113                Err(e) => return error_to_response(req.id, e),
114            };
115
116            let account;
117            let _mc_ref_handle;
118            ok_to_response(req.id, match item {
119                LoadedAccountState::NotFound { timings, .. } => {
120                    GetContractStateResponse::NotExists { timings }
121                }
122                LoadedAccountState::Found { state, timings, .. }
123                    if Some(state.last_trans_lt) <= p.last_transaction_lt =>
124                {
125                    GetContractStateResponse::Unchanged { timings }
126                }
127                LoadedAccountState::Found {
128                    state,
129                    timings,
130                    mc_ref_handle,
131                    ..
132                } => {
133                    _mc_ref_handle = mc_ref_handle;
134                    match state.load_account() {
135                        Ok(Some(loaded)) => {
136                            account = loaded;
137                            GetContractStateResponse::Exists {
138                                account: &account,
139                                timings,
140                                last_transaction_id: LastTransactionId {
141                                    hash: state.last_trans_hash,
142                                    lt: state.last_trans_lt,
143                                },
144                            }
145                        }
146                        Ok(None) => GetContractStateResponse::NotExists { timings },
147                        Err(e) => {
148                            return error_to_response(req.id, RpcStateError::Internal(e.into()));
149                        }
150                    }
151                }
152            })
153        }
154        MethodParams::GetAccountsByCodeHash(p) => {
155            if p.limit == 0 {
156                return JrpcOkResponse::<_, RfcBehaviour>::new(req.id, [(); 0]).into_response();
157            } else if p.limit > GetAccountsByCodeHashResponse::MAX_LIMIT {
158                return too_large_limit_response(req.id);
159            }
160            match state.get_accounts_by_code_hash(&p.code_hash, p.continuation.as_ref(), None) {
161                Ok(list) => ok_to_response(req.id, GetAccountsByCodeHashResponse {
162                    list: RefCell::new(Some(list)),
163                    limit: p.limit,
164                }),
165                Err(e) => error_to_response(req.id, e),
166            }
167        }
168        MethodParams::GetTransactionsList(p) => {
169            if p.limit == 0 {
170                return JrpcOkResponse::<_, RfcBehaviour>::new(req.id, [(); 0]).into_response();
171            } else if p.limit > GetTransactionsListResponse::MAX_LIMIT {
172                return too_large_limit_response(req.id);
173            }
174            match state.get_transactions(&p.account, None, p.last_transaction_lt, true, None) {
175                // TODO: Move serialization to a separate blocking task pool.
176                Ok(list) => ok_to_response(req.id, GetTransactionsListResponse {
177                    list: RefCell::new(Some(list)),
178                    limit: p.limit,
179                }),
180                Err(e) => error_to_response(req.id, e),
181            }
182        }
183        MethodParams::GetTransaction(p) => match state.get_transaction(&p.id, None) {
184            Ok(value) => ok_to_response(req.id, value.as_ref().map(encode_base64)),
185            Err(e) => error_to_response(req.id, e),
186        },
187        MethodParams::GetDstTransaction(p) => {
188            match state.get_dst_transaction(&p.message_hash, None) {
189                Ok(value) => ok_to_response(req.id, value.as_ref().map(encode_base64)),
190                Err(e) => error_to_response(req.id, e),
191            }
192        }
193        MethodParams::GetTransactionBlockId(p) => match state.get_transaction_info(&p.id, None) {
194            Ok(value) => ok_to_response(
195                req.id,
196                value.map(|info| BlockIdResponse {
197                    block_id: info.block_id,
198                }),
199            ),
200            Err(e) => error_to_response(req.id, e),
201        },
202        MethodParams::GetKeyBlockProof(p) => {
203            let res = match state.jrpc_cache().get_key_block_proof_response(p.seqno) {
204                Some(value) => value,
205                None => {
206                    let res = state.get_key_block_proof(p.seqno).await;
207                    state
208                        .jrpc_cache()
209                        .insert_key_block_proof_response(p.seqno, res)
210                }
211            };
212            ok_to_response(req.id, res.as_ref())
213        }
214        MethodParams::GetBlockProof(p) => {
215            if !state.config().allow_huge_requests {
216                return error_to_response(req.id, RpcStateError::NotSupported);
217            }
218
219            let proof = state.get_block_proof(&p.block_id).await.map(encode_base64);
220            ok_to_response(req.id, BlockProofResponse { proof })
221        }
222        MethodParams::GetBlockData(p) => {
223            if !state.config().allow_huge_requests {
224                return error_to_response(req.id, RpcStateError::NotSupported);
225            }
226
227            // TODO: Rework rate limiting for this request.
228            let _permit = state.acquire_download_block_permit().await;
229
230            let Some(data) = state.get_block_data(&p.block_id).await else {
231                return ok_to_response(req.id, BlockDataResponse { data: None });
232            };
233
234            tycho_util::sync::rayon_run(move || {
235                let data = encode_base64(data);
236                ok_to_response(req.id, BlockDataResponse { data: Some(data) })
237            })
238            .await
239        }
240        MethodParams::SubSubscribe(p) => with_subscriptions(&state, req.id, |subs| {
241            handle_sub(subs, p, SubscribeAction::Sub)
242        }),
243        MethodParams::SubUnsubscribe(p) => with_subscriptions(&state, req.id, |subs| {
244            handle_sub(subs, p, SubscribeAction::Unsub)
245        }),
246        MethodParams::SubUnsubscribeAll(p) => {
247            with_subscriptions(&state, req.id, |subs| handle_unsub_all(subs, p.uuid))
248        }
249        MethodParams::SubStatus(p) => {
250            with_subscriptions(&state, req.id, |subs| handle_status(subs, p.uuid))
251        }
252        MethodParams::SubListSubscriptions(p) => {
253            with_subscriptions(&state, req.id, |subs| handle_list(subs, p.uuid))
254        }
255    }
256}
257
258// === Requests ===
259
260#[derive(Debug)]
261pub struct EmptyParams;
262
263impl<'de> Deserialize<'de> for EmptyParams {
264    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
265    where
266        D: serde::Deserializer<'de>,
267    {
268        #[derive(Deserialize)]
269        struct Empty {}
270
271        // Accepts both `null` and empty object.
272        <Option<Empty>>::deserialize(deserializer).map(|_| Self)
273    }
274}
275
276#[derive(Debug, Deserialize)]
277pub struct SendMessageRequest {
278    #[serde(with = "Base64BytesWithLimit::<{ ExtMsgRepr::MAX_BOC_SIZE }>")]
279    pub message: bytes::Bytes,
280}
281
282#[derive(Debug, Deserialize)]
283#[serde(rename_all = "camelCase")]
284pub struct GetContractStateRequest {
285    pub address: StdAddr,
286    #[serde(default, with = "serde_helpers::option_string")]
287    pub last_transaction_lt: Option<u64>,
288}
289
290#[derive(Debug, Deserialize)]
291#[serde(rename_all = "camelCase")]
292pub struct GetLibraryCellRequest {
293    pub hash: HashBytes,
294}
295
296#[derive(Debug, Deserialize)]
297#[serde(rename_all = "camelCase")]
298pub struct GetAccountsByCodeHashRequest {
299    pub code_hash: HashBytes,
300    #[serde(default)]
301    pub continuation: Option<StdAddr>,
302    pub limit: u8,
303}
304
305#[derive(Debug, Deserialize)]
306#[serde(rename_all = "camelCase")]
307pub struct GetTransactionsListRequest {
308    pub account: StdAddr,
309    #[serde(default, with = "serde_helpers::option_string")]
310    pub last_transaction_lt: Option<u64>,
311    pub limit: u8,
312}
313
314#[derive(Debug, Deserialize)]
315pub struct GetTransactionRequest {
316    pub id: HashBytes,
317}
318
319#[derive(Debug, Deserialize)]
320pub struct GetKeyBlockProofRequest {
321    pub seqno: u32,
322}
323
324#[derive(Debug, Deserialize)]
325#[serde(rename_all = "camelCase")]
326pub struct GetBlockRequest {
327    #[serde(with = "serde_helpers::string")]
328    pub block_id: BlockId,
329}
330
331#[derive(Debug, Deserialize)]
332#[serde(rename_all = "camelCase")]
333pub struct GetDstTransactionRequest {
334    pub message_hash: HashBytes,
335}
336
337// === Responses ===
338
339// NOTE: `RpcState` full/not-full state is determined only once at startup,
340// so it is ok to cache the response.
341fn get_capabilities(state: &RpcState) -> &'static RawValue {
342    static RESULT: OnceLock<Box<RawValue>> = OnceLock::new();
343    RESULT.get_or_init(|| {
344        let mut capabilities = vec![
345            "getCapabilities",
346            "getLatestKeyBlock",
347            "getBlockchainConfig",
348            "getStatus",
349            "getTimings",
350            "getContractState",
351            "sendMessage",
352            "getLibraryCell",
353            "getKeyBlockProof",
354        ];
355
356        if state.config().allow_huge_requests {
357            capabilities.extend(["getBlockProof", "getBlockData"]);
358        }
359
360        if state.is_full() {
361            capabilities.extend([
362                "getTransactionsList",
363                "getTransaction",
364                "getDstTransaction",
365                "getAccountsByCodeHash",
366                "getTransactionBlockId",
367            ]);
368        }
369
370        {
371            capabilities.extend([
372                "subSubscribe",
373                "subUnsubscribe",
374                "subUnsubscribeAll",
375                "subStatus",
376                "subListSubscriptions",
377                "stream",
378            ]);
379        }
380
381        serde_json::value::to_raw_value(&capabilities).unwrap()
382    })
383}
384
385#[derive(Serialize)]
386pub struct GetStatusResponse {
387    ready: bool,
388}
389
390#[derive(Serialize)]
391#[serde(rename_all = "camelCase", tag = "type")]
392enum GetContractStateResponse<'a> {
393    NotExists {
394        timings: GenTimings,
395    },
396    #[serde(rename_all = "camelCase")]
397    Exists {
398        #[serde(serialize_with = "serialize_account")]
399        account: &'a Account,
400        timings: GenTimings,
401        last_transaction_id: LastTransactionId,
402    },
403    Unchanged {
404        timings: GenTimings,
405    },
406}
407
408fn serialize_account<S>(account: &Account, serializer: S) -> Result<S::Ok, S::Error>
409where
410    S: serde::Serializer,
411{
412    use serde::ser::Error;
413
414    let cell = crate::models::serialize_account(account).map_err(Error::custom)?;
415    Boc::encode_base64(cell).serialize(serializer)
416}
417
418struct GetAccountsByCodeHashResponse<'a> {
419    list: RefCell<Option<CodeHashesIter<'a>>>,
420    limit: u8,
421}
422
423impl GetAccountsByCodeHashResponse<'_> {
424    const MAX_LIMIT: u8 = 100;
425}
426
427impl Serialize for GetAccountsByCodeHashResponse<'_> {
428    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
429    where
430        S: serde::Serializer,
431    {
432        use serde::ser::SerializeSeq;
433
434        let list = self.list.borrow_mut().take().unwrap();
435
436        // NOTE: We cannot use `limit` as the sequence length because
437        // the iterator may return less.
438        let mut seq = serializer.serialize_seq(None)?;
439        for code_hash in list.take(self.limit as usize) {
440            seq.serialize_element(&code_hash)?;
441        }
442        seq.end()
443    }
444}
445
446struct GetTransactionsListResponse {
447    list: RefCell<Option<TransactionsIterBuilder>>,
448    limit: u8,
449}
450
451impl GetTransactionsListResponse {
452    const MAX_LIMIT: u8 = 100;
453}
454
455impl Serialize for GetTransactionsListResponse {
456    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
457    where
458        S: serde::Serializer,
459    {
460        use serde::ser::SerializeSeq;
461
462        let list = self.list.borrow_mut().take().unwrap();
463
464        let mut seq = serializer.serialize_seq(None)?;
465
466        let mut buffer = String::new();
467
468        // NOTE: We use a `.map` from a separate impl thus we cannot use `.try_for_each`.
469        #[allow(clippy::map_collect_result_unit)]
470        list.map(|item| {
471            BASE64_STANDARD.encode_string(item, &mut buffer);
472            let res = seq.serialize_element(&buffer);
473            buffer.clear();
474            Some(res)
475        })
476        .take(self.limit as _)
477        .collect::<Result<(), _>>()?;
478
479        seq.end()
480    }
481}
482
483#[derive(Serialize)]
484#[serde(rename_all = "camelCase")]
485struct BlockIdResponse {
486    #[serde(with = "serde_helpers::string")]
487    block_id: BlockId,
488}
489
490#[derive(Serialize)]
491struct BlockProofResponse {
492    proof: Option<String>,
493}
494
495#[derive(Serialize)]
496struct BlockDataResponse {
497    data: Option<String>,
498}
499
500fn encode_base64<T: AsRef<[u8]>>(value: T) -> String {
501    BASE64_STANDARD.encode(value)
502}
503
504fn with_subscriptions<T, F>(state: &RpcState, id: i64, f: F) -> Response
505where
506    T: Serialize,
507    F: FnOnce(&crate::state::RpcSubscriptions) -> Result<T, RpcStateError>,
508{
509    match f(state.subscriptions()) {
510        Ok(res) => ok_to_response(id, res),
511        Err(e) => error_to_response(id, e),
512    }
513}
514
515fn ok_to_response<T: Serialize>(id: i64, result: T) -> Response {
516    JrpcOkResponse::<_, RfcBehaviour>::new(id, result).into_response()
517}
518
519fn error_to_response(id: i64, e: RpcStateError) -> Response {
520    let (code, message) = match e {
521        RpcStateError::NotReady => (NOT_READY_CODE, Cow::Borrowed("not ready")),
522        RpcStateError::NotSupported => (NOT_SUPPORTED_CODE, Cow::Borrowed("method not supported")),
523        RpcStateError::Internal(e) => (INTERNAL_ERROR_CODE, e.to_string().into()),
524        RpcStateError::BadRequest(e) => (INVALID_PARAMS_CODE, e.to_string().into()),
525    };
526
527    JrpcErrorResponse {
528        id: Some(id),
529        code,
530        message,
531        behaviour: PhantomData::<RfcBehaviour>,
532    }
533    .into_response()
534}
535
536fn too_large_limit_response(id: i64) -> Response {
537    JrpcErrorResponse {
538        id: Some(id),
539        code: TOO_LARGE_LIMIT_CODE,
540        message: Cow::Borrowed("limit is too large"),
541        behaviour: PhantomData::<RfcBehaviour>,
542    }
543    .into_response()
544}