avalanche_types/subnet/rpc/vm/
server.rs

1//! RPC Chain VM Server.
2use std::{
3    sync::Arc,
4    time::{Duration, Instant},
5};
6
7use crate::{
8    ids,
9    packer::U32_LEN,
10    proto::pb::{
11        self,
12        aliasreader::alias_reader_client::AliasReaderClient,
13        google::protobuf::Empty,
14        keystore::keystore_client::KeystoreClient,
15        messenger::{messenger_client::MessengerClient, NotifyRequest},
16        sharedmemory::shared_memory_client::SharedMemoryClient,
17        vm,
18    },
19    subnet::rpc::{
20        consensus::snowman::{Block, Decidable},
21        context::Context,
22        database::rpcdb::{client::DatabaseClient, error_to_error_code},
23        database::{corruptabledb, manager::DatabaseManager},
24        errors,
25        http::server::Server as HttpServer,
26        snow::{
27            engine::common::{appsender::client::AppSenderClient, message::Message},
28            validators::client::ValidatorStateClient,
29            State,
30        },
31        snowman::block::ChainVm,
32        utils::{
33            self,
34            grpc::{self, timestamp_from_time},
35        },
36    },
37};
38use chrono::{TimeZone, Utc};
39use pb::vm::vm_server::Vm;
40use prost::bytes::Bytes;
41use tokio::sync::{broadcast, mpsc, RwLock};
42use tonic::{Request, Response};
43
44pub struct Server<V> {
45    /// Underlying Vm implementation.
46    pub vm: Arc<RwLock<V>>,
47
48    #[cfg(feature = "subnet_metrics")]
49    #[cfg_attr(docsrs, doc(cfg(feature = "subnet_metrics")))]
50    /// Subnet Prometheus process metrics.
51    pub process_metrics: Arc<RwLock<prometheus::Registry>>,
52
53    /// Stop channel broadcast producer.
54    pub stop_ch: broadcast::Sender<()>,
55}
56
57impl<V: ChainVm> Server<V> {
58    pub fn new(vm: V, stop_ch: broadcast::Sender<()>) -> Self {
59        Self {
60            vm: Arc::new(RwLock::new(vm)),
61            #[cfg(feature = "subnet_metrics")]
62            #[cfg_attr(docsrs, doc(cfg(feature = "subnet_metrics")))]
63            process_metrics: Arc::new(RwLock::new(prometheus::default_registry().to_owned())),
64            stop_ch,
65        }
66    }
67
68    /// Attempts to get the ancestors of a block from the underlying Vm.
69    pub async fn vm_ancestors(
70        &self,
71        block_id_bytes: &[u8],
72        max_block_num: i32,
73        max_block_size: i32,
74        max_block_retrival_time: Duration,
75    ) -> std::io::Result<Vec<Bytes>> {
76        let inner_vm = self.vm.read().await;
77        inner_vm
78            .get_ancestors(
79                ids::Id::from_slice(block_id_bytes),
80                max_block_num,
81                max_block_size,
82                max_block_retrival_time,
83            )
84            .await
85    }
86}
87
88#[tonic::async_trait]
89impl<V> Vm for Server<V>
90where
91    V: ChainVm<
92            DatabaseManager = DatabaseManager,
93            AppSender = AppSenderClient,
94            ValidatorState = ValidatorStateClient,
95        > + Send
96        + Sync
97        + 'static,
98{
99    /// Implements "avalanchego/vms/rpcchainvm#VMServer.Initialize".
100    /// ref. <https://github.com/ava-labs/avalanchego/blob/v1.11.1/vms/rpcchainvm/vm_server.go#L98>
101    /// ref. <https://github.com/ava-labs/avalanchego/blob/v1.11.1/vms/rpcchainvm/vm_client.go#L123-L133>
102    async fn initialize(
103        &self,
104        req: Request<vm::InitializeRequest>,
105    ) -> std::result::Result<Response<vm::InitializeResponse>, tonic::Status> {
106        log::info!("initialize called");
107
108        let req = req.into_inner();
109
110        let db_server_addr = req.db_server_addr.as_str();
111        let db_client_conn = utils::grpc::default_client(db_server_addr)?
112            .connect()
113            .await
114            .map_err(|e| {
115                tonic::Status::unknown(format!(
116                    "failed to create db client conn from: {db_server_addr}: {e}",
117                ))
118            })?;
119        let db =
120            corruptabledb::Database::new_boxed(DatabaseClient::new_boxed(db_client_conn.clone()));
121
122        let server_addr = req.server_addr.as_str();
123        let client_conn = utils::grpc::default_client(server_addr)?
124            .connect()
125            .await
126            .map_err(|e| {
127                tonic::Status::unknown(format!(
128                    "failed to create client conn from: {server_addr}: {e}",
129                ))
130            })?;
131
132        // Multiplexing in tonic is done by cloning the client which is very cheap.
133        // ref. https://docs.rs/tonic/latest/tonic/transport/struct.Channel.html#multiplexing-requests
134        let mut message = MessengerClient::new(client_conn.clone());
135        let keystore = KeystoreClient::new(client_conn.clone());
136        let shared_memory = SharedMemoryClient::new(client_conn.clone());
137        let bc_lookup = AliasReaderClient::new(client_conn.clone());
138
139        let ctx: Option<Context<ValidatorStateClient>> = Some(Context {
140            network_id: req.network_id,
141            subnet_id: ids::Id::from_slice(&req.subnet_id),
142            chain_id: ids::Id::from_slice(&req.chain_id),
143            node_id: ids::node::Id::from_slice(&req.node_id),
144            x_chain_id: ids::Id::from_slice(&req.x_chain_id),
145            c_chain_id: ids::Id::from_slice(&req.c_chain_id),
146            avax_asset_id: ids::Id::from_slice(&req.avax_asset_id),
147            keystore,
148            shared_memory,
149            bc_lookup,
150            chain_data_dir: req.chain_data_dir,
151            validator_state: ValidatorStateClient::new(client_conn.clone()),
152        });
153
154        let (tx_engine, mut rx_engine): (mpsc::Sender<Message>, mpsc::Receiver<Message>) =
155            mpsc::channel(100);
156        tokio::spawn(async move {
157            loop {
158                if let Some(msg) = rx_engine.recv().await {
159                    log::debug!("message received: {msg:?}");
160                    let _ = message
161                        .notify(NotifyRequest {
162                            message: msg as i32,
163                        })
164                        .await
165                        .map_err(|s| tonic::Status::unknown(s.to_string()));
166                    continue;
167                }
168
169                log::error!("engine receiver closed unexpectedly");
170                return tonic::Status::unknown("engine receiver closed unexpectedly");
171            }
172        });
173
174        let mut inner_vm = self.vm.write().await;
175        inner_vm
176            .initialize(
177                ctx,
178                db,
179                &req.genesis_bytes,
180                &req.upgrade_bytes,
181                &req.config_bytes,
182                tx_engine,
183                &[()],
184                AppSenderClient::new(client_conn.clone()),
185            )
186            .await
187            .map_err(|e| tonic::Status::unknown(e.to_string()))?;
188
189        // Get last accepted block on the chain
190        let last_accepted = inner_vm.last_accepted().await?;
191
192        let last_accepted_block = inner_vm
193            .get_block(last_accepted)
194            .await
195            .map_err(|e| tonic::Status::unknown(e.to_string()))?;
196
197        log::debug!("last_accepted_block id: {last_accepted:?}");
198
199        Ok(Response::new(vm::InitializeResponse {
200            last_accepted_id: Bytes::from(last_accepted.to_vec()),
201            last_accepted_parent_id: Bytes::from(last_accepted_block.parent().await.to_vec()),
202            bytes: Bytes::from(last_accepted_block.bytes().await.to_vec()),
203            height: last_accepted_block.height().await,
204            timestamp: Some(timestamp_from_time(
205                &Utc.timestamp_opt(last_accepted_block.timestamp().await as i64, 0)
206                    .unwrap(),
207            )),
208        }))
209    }
210
211    async fn shutdown(
212        &self,
213        _req: Request<Empty>,
214    ) -> std::result::Result<Response<Empty>, tonic::Status> {
215        log::debug!("shutdown called");
216
217        // notify all gRPC servers to shutdown
218        self.stop_ch
219            .send(())
220            .map_err(|e| tonic::Status::unknown(e.to_string()))?;
221
222        Ok(Response::new(Empty {}))
223    }
224
225    /// Implements "avalanchego/vms/rpcchainvm#VMServer.CreateHandlers".
226    /// ref. <https://github.com/ava-labs/avalanchego/blob/v1.11.1/vms/rpcchainvm/vm_server.go#L312-L336>
227    /// ref. <https://github.com/ava-labs/avalanchego/blob/v1.11.1/vms/rpcchainvm/vm_client.go#L354-L371>
228    ///
229    /// Creates the HTTP handlers for custom chain network calls.
230    /// This creates and exposes handlers that the outside world can use to communicate
231    /// with the chain. Each handler has the path:
232    /// `\[Address of node]/ext/bc/[chain ID]/[extension\]`
233    ///
234    /// Returns a mapping from \[extension\]s to HTTP handlers.
235    /// Each extension can specify how locking is managed for convenience.
236    ///
237    /// For example, if this VM implements an account-based payments system,
238    /// it have an extension called `accounts`, where clients could get
239    /// information about their accounts.
240    async fn create_handlers(
241        &self,
242        _req: Request<Empty>,
243    ) -> std::result::Result<Response<vm::CreateHandlersResponse>, tonic::Status> {
244        log::debug!("create_handlers called");
245
246        // get handlers from underlying vm
247        let mut inner_vm = self.vm.write().await;
248        let handlers = inner_vm
249            .create_handlers()
250            .await
251            .map_err(|e| tonic::Status::unknown(format!("failed to create handlers: {e}")))?;
252
253        // create and start gRPC server serving HTTP service for each handler
254        let mut resp_handlers: Vec<vm::Handler> = Vec::with_capacity(handlers.keys().len());
255        for (prefix, http_handler) in handlers {
256            let server_addr = utils::new_socket_addr();
257            let server = grpc::Server::new(server_addr, self.stop_ch.subscribe());
258
259            server
260                .serve(pb::http::http_server::HttpServer::new(HttpServer::new(
261                    http_handler.handler,
262                )))
263                .map_err(|e| {
264                    tonic::Status::unknown(format!("failed to create http service: {e}"))
265                })?;
266
267            let resp_handler = vm::Handler {
268                prefix,
269                server_addr: server_addr.to_string(),
270            };
271            resp_handlers.push(resp_handler);
272        }
273
274        Ok(Response::new(vm::CreateHandlersResponse {
275            handlers: resp_handlers,
276        }))
277    }
278
279    async fn build_block(
280        &self,
281        _req: Request<vm::BuildBlockRequest>,
282    ) -> std::result::Result<Response<vm::BuildBlockResponse>, tonic::Status> {
283        log::debug!("build_block called");
284
285        let inner_vm = self.vm.write().await;
286        let block = inner_vm
287            .build_block()
288            .await
289            .map_err(|e| tonic::Status::unknown(e.to_string()))?;
290
291        Ok(Response::new(vm::BuildBlockResponse {
292            id: Bytes::from(block.id().await.to_vec()),
293            parent_id: Bytes::from(block.parent().await.to_vec()),
294            bytes: Bytes::from(block.bytes().await.to_vec()),
295            height: block.height().await,
296            timestamp: Some(timestamp_from_time(
297                &Utc.timestamp_opt(block.timestamp().await as i64, 0)
298                    .unwrap(),
299            )),
300            verify_with_context: false,
301        }))
302    }
303
304    async fn parse_block(
305        &self,
306        req: Request<vm::ParseBlockRequest>,
307    ) -> std::result::Result<Response<vm::ParseBlockResponse>, tonic::Status> {
308        log::debug!("parse_block called");
309
310        let req = req.into_inner();
311        let inner_vm = self.vm.write().await;
312        let block = inner_vm
313            .parse_block(req.bytes.as_ref())
314            .await
315            .map_err(|e| tonic::Status::unknown(e.to_string()))?;
316
317        Ok(Response::new(vm::ParseBlockResponse {
318            id: Bytes::from(block.id().await.to_vec()),
319            parent_id: Bytes::from(block.parent().await.to_vec()),
320            status: block.status().await.to_i32(),
321            height: block.height().await,
322            timestamp: Some(timestamp_from_time(
323                &Utc.timestamp_opt(block.timestamp().await as i64, 0)
324                    .unwrap(),
325            )),
326            verify_with_context: false,
327        }))
328    }
329
330    /// Attempt to load a block.
331    ///
332    /// If the block does not exist, an empty GetBlockResponse is returned with
333    /// an error code.
334    ///
335    /// It is expected that blocks that have been successfully verified should be
336    /// returned correctly. It is also expected that blocks that have been
337    /// accepted by the consensus engine should be able to be fetched. It is not
338    /// required for blocks that have been rejected by the consensus engine to be
339    /// able to be fetched.
340    ///
341    /// ref: <https://pkg.go.dev/github.com/ava-labs/avalanchego/snow/engine/snowman/block#Getter>
342    async fn get_block(
343        &self,
344        req: Request<vm::GetBlockRequest>,
345    ) -> std::result::Result<Response<vm::GetBlockResponse>, tonic::Status> {
346        log::debug!("get_block called");
347
348        let req = req.into_inner();
349        let inner_vm = self.vm.read().await;
350
351        // determine if response is an error or not
352        match inner_vm.get_block(ids::Id::from_slice(&req.id)).await {
353            Ok(block) => Ok(Response::new(vm::GetBlockResponse {
354                parent_id: Bytes::from(block.parent().await.to_vec()),
355                bytes: Bytes::from(block.bytes().await.to_vec()),
356                status: block.status().await.to_i32(),
357                height: block.height().await,
358                timestamp: Some(timestamp_from_time(
359                    &Utc.timestamp_opt(block.timestamp().await as i64, 0)
360                        .unwrap(),
361                )),
362                err: 0, // return 0 indicating no error
363                verify_with_context: false,
364            })),
365            // if an error was found, generate empty response with ErrNotFound code
366            // ref: https://github.com/ava-labs/avalanchego/blob/master/vms/
367            Err(e) => {
368                log::debug!("Error getting block");
369                Ok(Response::new(vm::GetBlockResponse {
370                    parent_id: Bytes::new(),
371                    bytes: Bytes::new(),
372                    status: 0,
373                    height: 0,
374                    timestamp: Some(timestamp_from_time(&Utc.timestamp_opt(0, 0).unwrap())),
375                    err: error_to_error_code(&e.to_string()),
376                    verify_with_context: false,
377                }))
378            }
379        }
380    }
381
382    async fn set_state(
383        &self,
384        req: Request<vm::SetStateRequest>,
385    ) -> std::result::Result<Response<vm::SetStateResponse>, tonic::Status> {
386        log::debug!("set_state called");
387
388        let req = req.into_inner();
389        let inner_vm = self.vm.write().await;
390        let state = State::try_from(req.state)
391            .map_err(|_| tonic::Status::unknown("failed to convert to vm state"))?;
392
393        inner_vm
394            .set_state(state)
395            .await
396            .map_err(|e| tonic::Status::unknown(e.to_string()))?;
397
398        let last_accepted_id = inner_vm
399            .last_accepted()
400            .await
401            .map_err(|e| tonic::Status::unknown(e.to_string()))?;
402
403        let block = inner_vm
404            .get_block(last_accepted_id)
405            .await
406            .map_err(|e| tonic::Status::unknown(e.to_string()))?;
407
408        Ok(Response::new(vm::SetStateResponse {
409            last_accepted_id: Bytes::from(last_accepted_id.to_vec()),
410            last_accepted_parent_id: Bytes::from(block.parent().await.to_vec()),
411            height: block.height().await,
412            bytes: Bytes::from(block.bytes().await.to_vec()),
413            timestamp: Some(timestamp_from_time(
414                &Utc.timestamp_opt(block.timestamp().await as i64, 0)
415                    .unwrap(),
416            )),
417        }))
418    }
419
420    async fn set_preference(
421        &self,
422        req: Request<vm::SetPreferenceRequest>,
423    ) -> std::result::Result<Response<Empty>, tonic::Status> {
424        log::debug!("set_preference called");
425
426        let req = req.into_inner();
427        let inner_vm = self.vm.read().await;
428        inner_vm
429            .set_preference(ids::Id::from_slice(&req.id))
430            .await
431            .map_err(|e| tonic::Status::unknown(e.to_string()))?;
432
433        Ok(Response::new(Empty {}))
434    }
435
436    async fn health(
437        &self,
438        _req: Request<Empty>,
439    ) -> std::result::Result<Response<vm::HealthResponse>, tonic::Status> {
440        log::debug!("health called");
441
442        let inner_vm = self.vm.read().await;
443        let resp = inner_vm
444            .health_check()
445            .await
446            .map_err(|e| tonic::Status::unknown(e.to_string()))?;
447
448        Ok(Response::new(vm::HealthResponse {
449            details: Bytes::from(resp),
450        }))
451    }
452
453    async fn version(
454        &self,
455        _req: Request<Empty>,
456    ) -> std::result::Result<Response<vm::VersionResponse>, tonic::Status> {
457        log::debug!("version called");
458
459        let inner_vm = self.vm.read().await;
460        let version = inner_vm
461            .version()
462            .await
463            .map_err(|e| tonic::Status::unknown(e.to_string()))?;
464
465        Ok(Response::new(vm::VersionResponse { version }))
466    }
467
468    async fn connected(
469        &self,
470        req: Request<vm::ConnectedRequest>,
471    ) -> std::result::Result<Response<Empty>, tonic::Status> {
472        log::debug!("connected called");
473
474        let req = req.into_inner();
475        let inner_vm = self.vm.read().await;
476        let node_id = ids::node::Id::from_slice(&req.node_id);
477        inner_vm
478            .connected(&node_id)
479            .await
480            .map_err(|e| tonic::Status::unknown(e.to_string()))?;
481
482        Ok(Response::new(Empty {}))
483    }
484
485    async fn disconnected(
486        &self,
487        req: Request<vm::DisconnectedRequest>,
488    ) -> std::result::Result<Response<Empty>, tonic::Status> {
489        log::debug!("disconnected called");
490
491        let req = req.into_inner();
492        let inner_vm = self.vm.read().await;
493        let node_id = ids::node::Id::from_slice(&req.node_id);
494
495        inner_vm
496            .disconnected(&node_id)
497            .await
498            .map_err(|e| tonic::Status::unknown(e.to_string()))?;
499
500        Ok(Response::new(Empty {}))
501    }
502
503    async fn app_request(
504        &self,
505        req: Request<vm::AppRequestMsg>,
506    ) -> std::result::Result<Response<Empty>, tonic::Status> {
507        log::debug!("app_request called");
508
509        let req = req.into_inner();
510        let node_id = ids::node::Id::from_slice(&req.node_id);
511        let inner_vm = self.vm.read().await;
512
513        let ts = req.deadline.as_ref().expect("timestamp");
514        let deadline = Utc.timestamp_opt(ts.seconds, ts.nanos as u32).unwrap();
515
516        inner_vm
517            .app_request(&node_id, req.request_id, deadline, &req.request)
518            .await
519            .map_err(|e| tonic::Status::unknown(e.to_string()))?;
520
521        Ok(Response::new(Empty {}))
522    }
523
524    async fn app_request_failed(
525        &self,
526        req: Request<vm::AppRequestFailedMsg>,
527    ) -> std::result::Result<Response<Empty>, tonic::Status> {
528        log::debug!("app_request_failed called");
529
530        let req = req.into_inner();
531        let node_id = ids::node::Id::from_slice(&req.node_id);
532        let inner_vm = self.vm.read().await;
533
534        inner_vm
535            .app_request_failed(&node_id, req.request_id)
536            .await
537            .map_err(|e| tonic::Status::unknown(e.to_string()))?;
538
539        Ok(Response::new(Empty {}))
540    }
541
542    async fn app_response(
543        &self,
544        req: Request<vm::AppResponseMsg>,
545    ) -> std::result::Result<Response<Empty>, tonic::Status> {
546        log::debug!("app_response called");
547
548        let req = req.into_inner();
549        let node_id = ids::node::Id::from_slice(&req.node_id);
550        let inner_vm = self.vm.read().await;
551
552        inner_vm
553            .app_response(&node_id, req.request_id, &req.response)
554            .await
555            .map_err(|e| tonic::Status::unknown(e.to_string()))?;
556
557        Ok(Response::new(Empty {}))
558    }
559
560    async fn app_gossip(
561        &self,
562        req: Request<vm::AppGossipMsg>,
563    ) -> std::result::Result<Response<Empty>, tonic::Status> {
564        log::debug!("app_gossip called");
565
566        let req = req.into_inner();
567        let node_id = ids::node::Id::from_slice(&req.node_id);
568        let inner_vm = self.vm.read().await;
569
570        inner_vm
571            .app_gossip(&node_id, &req.msg)
572            .await
573            .map_err(|e| tonic::Status::unknown(e.to_string()))?;
574
575        Ok(Response::new(Empty {}))
576    }
577
578    async fn block_verify(
579        &self,
580        req: Request<vm::BlockVerifyRequest>,
581    ) -> std::result::Result<Response<vm::BlockVerifyResponse>, tonic::Status> {
582        log::debug!("block_verify called");
583
584        let req = req.into_inner();
585        let inner_vm = self.vm.read().await;
586
587        let mut block = inner_vm
588            .parse_block(&req.bytes)
589            .await
590            .map_err(|e| tonic::Status::unknown(e.to_string()))?;
591
592        block
593            .verify()
594            .await
595            .map_err(|e| tonic::Status::unknown(e.to_string()))?;
596
597        Ok(Response::new(vm::BlockVerifyResponse {
598            timestamp: Some(timestamp_from_time(
599                &Utc.timestamp_opt(block.timestamp().await as i64, 0)
600                    .unwrap(),
601            )),
602        }))
603    }
604
605    async fn block_accept(
606        &self,
607        req: Request<vm::BlockAcceptRequest>,
608    ) -> std::result::Result<Response<Empty>, tonic::Status> {
609        log::debug!("block_accept called");
610
611        let req = req.into_inner();
612        let inner_vm = self.vm.read().await;
613        let id = ids::Id::from_slice(&req.id);
614
615        let mut block = inner_vm
616            .get_block(id)
617            .await
618            .map_err(|e| tonic::Status::unknown(e.to_string()))?;
619
620        block
621            .accept()
622            .await
623            .map_err(|e| tonic::Status::unknown(e.to_string()))?;
624
625        Ok(Response::new(Empty {}))
626    }
627    async fn block_reject(
628        &self,
629        req: Request<vm::BlockRejectRequest>,
630    ) -> std::result::Result<Response<Empty>, tonic::Status> {
631        log::debug!("block_reject called");
632
633        let req = req.into_inner();
634        let inner_vm = self.vm.read().await;
635        let id = ids::Id::from_slice(&req.id);
636
637        let mut block = inner_vm
638            .get_block(id)
639            .await
640            .map_err(|e| tonic::Status::unknown(e.to_string()))?;
641
642        block
643            .reject()
644            .await
645            .map_err(|e| tonic::Status::unknown(e.to_string()))?;
646
647        Ok(Response::new(Empty {}))
648    }
649
650    async fn get_ancestors(
651        &self,
652        req: Request<vm::GetAncestorsRequest>,
653    ) -> std::result::Result<Response<vm::GetAncestorsResponse>, tonic::Status> {
654        log::debug!("get_ancestors called");
655        let req = req.into_inner();
656
657        let block_id = ids::Id::from_slice(req.blk_id.as_ref());
658        let max_blocks_size = usize::try_from(req.max_blocks_size).expect("cast from i32");
659        let max_blocks_num = usize::try_from(req.max_blocks_num).expect("cast from i32");
660        let max_blocks_retrival_time = Duration::from_secs(
661            req.max_blocks_retrival_time
662                .try_into()
663                .expect("valid timestamp"),
664        );
665
666        let ancestors = self
667            .vm_ancestors(
668                req.blk_id.as_ref(),
669                req.max_blocks_num,
670                req.max_blocks_size,
671                max_blocks_retrival_time,
672            )
673            .await
674            .map(|blks_bytes| Response::new(vm::GetAncestorsResponse { blks_bytes }));
675
676        let e = match ancestors {
677            Ok(ancestors) => return Ok(ancestors),
678            Err(e) => e,
679        };
680
681        if e.kind() != std::io::ErrorKind::Unsupported {
682            return Err(tonic::Status::unknown(e.to_string()));
683        }
684
685        // not supported by underlying vm use local logic
686        let start = Instant::now();
687        let mut block = match self.vm.read().await.get_block(block_id).await {
688            Ok(b) => b,
689            Err(e) => {
690                // special case ErrNotFound as an empty response: this signals
691                // the client to avoid contacting this node for further ancestors
692                // as they may have been pruned or unavailable due to state-sync.
693                return if errors::is_not_found(&e) {
694                    log::debug!("get_ancestors local get_block returned: not found");
695
696                    Ok(Response::new(vm::GetAncestorsResponse {
697                        blks_bytes: Vec::new(),
698                    }))
699                } else {
700                    Err(e.into())
701                };
702            }
703        };
704
705        let mut ancestors = Vec::with_capacity(max_blocks_num);
706        let block_bytes = block.bytes().await;
707
708        // length, in bytes, of all elements of ancestors
709        let mut ancestors_bytes_len = block_bytes.len() + U32_LEN;
710        ancestors.push(Bytes::from(block_bytes.to_owned()));
711
712        while ancestors.len() < max_blocks_num {
713            if start.elapsed() < max_blocks_retrival_time {
714                log::debug!("get_ancestors exceeded max block retrival time");
715                break;
716            }
717
718            let parent_id = block.parent().await;
719
720            block = match self.vm.read().await.get_block(parent_id).await {
721                Ok(b) => b,
722                Err(e) => {
723                    if errors::is_not_found(&e) {
724                        // after state sync we may not have the full chain
725                        log::debug!("failed to get block during ancestors lookup parentId: {parent_id}: {e}");
726                    }
727
728                    break;
729                }
730            };
731
732            let block_bytes = block.bytes().await;
733
734            // Ensure response size isn't too large. Include U32_LEN because
735            // the size of the message is included with each container, and the size
736            // is repr. by 4 bytes.
737            ancestors_bytes_len += block_bytes.len() + U32_LEN;
738
739            if ancestors_bytes_len > max_blocks_size {
740                log::debug!("get_ancestors reached maximum response size: {ancestors_bytes_len}");
741                break;
742            }
743
744            ancestors.push(Bytes::from(block_bytes.to_owned()));
745        }
746
747        Ok(Response::new(vm::GetAncestorsResponse {
748            blks_bytes: ancestors,
749        }))
750    }
751
752    async fn batched_parse_block(
753        &self,
754        req: Request<vm::BatchedParseBlockRequest>,
755    ) -> std::result::Result<Response<vm::BatchedParseBlockResponse>, tonic::Status> {
756        log::debug!("batched_parse_block called");
757        let req = req.into_inner();
758
759        let to_parse = req
760            .request
761            .into_iter()
762            .map(|bytes| Request::new(vm::ParseBlockRequest { bytes }))
763            .map(|request| async {
764                self.parse_block(request)
765                    .await
766                    .map(|block| block.into_inner())
767            });
768        let blocks = futures::future::try_join_all(to_parse).await?;
769
770        Ok(Response::new(vm::BatchedParseBlockResponse {
771            response: blocks,
772        }))
773    }
774
775    #[cfg(not(feature = "subnet_metrics"))]
776    async fn gather(
777        &self,
778        _req: Request<Empty>,
779    ) -> std::result::Result<Response<vm::GatherResponse>, tonic::Status> {
780        log::debug!("gather called");
781
782        let metric_families =
783            vec![crate::proto::pb::io::prometheus::client::MetricFamily::default()];
784
785        Ok(Response::new(vm::GatherResponse { metric_families }))
786    }
787
788    #[cfg(feature = "subnet_metrics")]
789    #[cfg_attr(docsrs, doc(cfg(feature = "subnet_metrics")))]
790    async fn gather(
791        &self,
792        _req: Request<Empty>,
793    ) -> std::result::Result<Response<vm::GatherResponse>, tonic::Status> {
794        log::debug!("gather called");
795
796        // ref. <https://prometheus.io/docs/instrumenting/writing_clientlibs/#process-metrics>
797        let metric_families = crate::subnet::rpc::metrics::MetricsFamilies::from(
798            &self.process_metrics.read().await.gather(),
799        )
800        .mfs;
801
802        Ok(Response::new(vm::GatherResponse { metric_families }))
803    }
804
805    async fn cross_chain_app_request(
806        &self,
807        req: Request<vm::CrossChainAppRequestMsg>,
808    ) -> std::result::Result<Response<Empty>, tonic::Status> {
809        log::debug!("cross_chain_app_request called");
810        let msg = req.into_inner();
811        let chain_id = &ids::Id::from_slice(&msg.chain_id);
812
813        let ts = msg.deadline.as_ref().expect("timestamp");
814        let deadline = Utc.timestamp_opt(ts.seconds, ts.nanos as u32).unwrap();
815
816        let inner_vm = self.vm.read().await;
817        inner_vm
818            .cross_chain_app_request(chain_id, msg.request_id, deadline, &msg.request)
819            .await
820            .map_err(|e| tonic::Status::unknown(e.to_string()))?;
821
822        Ok(Response::new(Empty {}))
823    }
824
825    async fn cross_chain_app_request_failed(
826        &self,
827        req: Request<vm::CrossChainAppRequestFailedMsg>,
828    ) -> std::result::Result<Response<Empty>, tonic::Status> {
829        log::debug!("cross_chain_app_request_failed called");
830        let msg = req.into_inner();
831        let chain_id = &ids::Id::from_slice(&msg.chain_id);
832
833        let inner_vm = self.vm.read().await;
834        inner_vm
835            .cross_chain_app_request_failed(chain_id, msg.request_id)
836            .await
837            .map_err(|e| tonic::Status::unknown(e.to_string()))?;
838
839        Ok(Response::new(Empty {}))
840    }
841
842    async fn cross_chain_app_response(
843        &self,
844        req: Request<vm::CrossChainAppResponseMsg>,
845    ) -> std::result::Result<Response<Empty>, tonic::Status> {
846        log::debug!("cross_chain_app_response called");
847        let msg = req.into_inner();
848        let chain_id = &ids::Id::from_slice(&msg.chain_id);
849
850        let inner_vm = self.vm.read().await;
851        inner_vm
852            .cross_chain_app_response(chain_id, msg.request_id, &msg.response)
853            .await
854            .map_err(|e| tonic::Status::unknown(e.to_string()))?;
855
856        Ok(Response::new(Empty {}))
857    }
858
859    async fn state_sync_enabled(
860        &self,
861        _req: Request<Empty>,
862    ) -> std::result::Result<Response<vm::StateSyncEnabledResponse>, tonic::Status> {
863        log::debug!("state_sync_enabled called");
864
865        // TODO: Implement state sync request/response
866        Ok(Response::new(vm::StateSyncEnabledResponse {
867            enabled: false,
868            err: 0,
869        }))
870    }
871
872    async fn get_ongoing_sync_state_summary(
873        &self,
874        _req: Request<Empty>,
875    ) -> std::result::Result<Response<vm::GetOngoingSyncStateSummaryResponse>, tonic::Status> {
876        log::debug!("get_ongoing_sync_state_summary called");
877
878        Err(tonic::Status::unimplemented(
879            "get_ongoing_sync_state_summary",
880        ))
881    }
882
883    async fn parse_state_summary(
884        &self,
885        _req: Request<vm::ParseStateSummaryRequest>,
886    ) -> std::result::Result<tonic::Response<vm::ParseStateSummaryResponse>, tonic::Status> {
887        log::debug!("parse_state_summary called");
888
889        Err(tonic::Status::unimplemented("parse_state_summary"))
890    }
891
892    async fn get_state_summary(
893        &self,
894        _req: Request<vm::GetStateSummaryRequest>,
895    ) -> std::result::Result<Response<vm::GetStateSummaryResponse>, tonic::Status> {
896        log::debug!("get_state_summary called");
897
898        Err(tonic::Status::unimplemented("get_state_summary"))
899    }
900
901    async fn get_last_state_summary(
902        &self,
903        _req: Request<Empty>,
904    ) -> std::result::Result<Response<vm::GetLastStateSummaryResponse>, tonic::Status> {
905        log::debug!("get_last_state_summary called");
906
907        Err(tonic::Status::unimplemented("get_last_state_summary"))
908    }
909
910    async fn state_summary_accept(
911        &self,
912        _req: Request<vm::StateSummaryAcceptRequest>,
913    ) -> std::result::Result<tonic::Response<vm::StateSummaryAcceptResponse>, tonic::Status> {
914        log::debug!("state_summary_accept called");
915
916        Err(tonic::Status::unimplemented("state_summary_accept"))
917    }
918
919    async fn verify_height_index(
920        &self,
921        _req: Request<Empty>,
922    ) -> std::result::Result<Response<vm::VerifyHeightIndexResponse>, tonic::Status> {
923        log::debug!("verify_height_index called");
924
925        let inner_vm = self.vm.read().await;
926
927        match inner_vm.verify_height_index().await {
928            Ok(_) => return Ok(Response::new(vm::VerifyHeightIndexResponse { err: 0 })),
929            Err(e) => {
930                if error_to_error_code(&e.to_string()) != 0 {
931                    return Ok(Response::new(vm::VerifyHeightIndexResponse {
932                        err: error_to_error_code(&e.to_string()),
933                    }));
934                }
935                return Err(tonic::Status::unknown(e.to_string()));
936            }
937        }
938    }
939
940    async fn get_block_id_at_height(
941        &self,
942        req: Request<vm::GetBlockIdAtHeightRequest>,
943    ) -> std::result::Result<Response<vm::GetBlockIdAtHeightResponse>, tonic::Status> {
944        log::debug!("get_block_id_at_height called");
945
946        let msg = req.into_inner();
947        let inner_vm = self.vm.read().await;
948
949        match inner_vm.get_block_id_at_height(msg.height).await {
950            Ok(height) => {
951                return Ok(Response::new(vm::GetBlockIdAtHeightResponse {
952                    blk_id: height.to_vec().into(),
953                    err: 0,
954                }))
955            }
956            Err(e) => {
957                if error_to_error_code(&e.to_string()) != 0 {
958                    return Ok(Response::new(vm::GetBlockIdAtHeightResponse {
959                        blk_id: vec![].into(),
960                        err: error_to_error_code(&e.to_string()),
961                    }));
962                }
963                return Err(tonic::Status::unknown(e.to_string()));
964            }
965        }
966    }
967}