1use std::{
3 sync::Arc,
4 time::{Duration, Instant},
5};
6
7use crate::{
8 ids,
9 packer::U32_LEN,
10 proto::pb::{
11 self,
12 aliasreader::alias_reader_client::AliasReaderClient,
13 google::protobuf::Empty,
14 keystore::keystore_client::KeystoreClient,
15 messenger::{messenger_client::MessengerClient, NotifyRequest},
16 sharedmemory::shared_memory_client::SharedMemoryClient,
17 vm,
18 },
19 subnet::rpc::{
20 consensus::snowman::{Block, Decidable},
21 context::Context,
22 database::rpcdb::{client::DatabaseClient, error_to_error_code},
23 database::{corruptabledb, manager::DatabaseManager},
24 errors,
25 http::server::Server as HttpServer,
26 snow::{
27 engine::common::{appsender::client::AppSenderClient, message::Message},
28 validators::client::ValidatorStateClient,
29 State,
30 },
31 snowman::block::ChainVm,
32 utils::{
33 self,
34 grpc::{self, timestamp_from_time},
35 },
36 },
37};
38use chrono::{TimeZone, Utc};
39use pb::vm::vm_server::Vm;
40use prost::bytes::Bytes;
41use tokio::sync::{broadcast, mpsc, RwLock};
42use tonic::{Request, Response};
43
44pub struct Server<V> {
45 pub vm: Arc<RwLock<V>>,
47
48 #[cfg(feature = "subnet_metrics")]
49 #[cfg_attr(docsrs, doc(cfg(feature = "subnet_metrics")))]
50 pub process_metrics: Arc<RwLock<prometheus::Registry>>,
52
53 pub stop_ch: broadcast::Sender<()>,
55}
56
57impl<V: ChainVm> Server<V> {
58 pub fn new(vm: V, stop_ch: broadcast::Sender<()>) -> Self {
59 Self {
60 vm: Arc::new(RwLock::new(vm)),
61 #[cfg(feature = "subnet_metrics")]
62 #[cfg_attr(docsrs, doc(cfg(feature = "subnet_metrics")))]
63 process_metrics: Arc::new(RwLock::new(prometheus::default_registry().to_owned())),
64 stop_ch,
65 }
66 }
67
68 pub async fn vm_ancestors(
70 &self,
71 block_id_bytes: &[u8],
72 max_block_num: i32,
73 max_block_size: i32,
74 max_block_retrival_time: Duration,
75 ) -> std::io::Result<Vec<Bytes>> {
76 let inner_vm = self.vm.read().await;
77 inner_vm
78 .get_ancestors(
79 ids::Id::from_slice(block_id_bytes),
80 max_block_num,
81 max_block_size,
82 max_block_retrival_time,
83 )
84 .await
85 }
86}
87
88#[tonic::async_trait]
89impl<V> Vm for Server<V>
90where
91 V: ChainVm<
92 DatabaseManager = DatabaseManager,
93 AppSender = AppSenderClient,
94 ValidatorState = ValidatorStateClient,
95 > + Send
96 + Sync
97 + 'static,
98{
99 async fn initialize(
103 &self,
104 req: Request<vm::InitializeRequest>,
105 ) -> std::result::Result<Response<vm::InitializeResponse>, tonic::Status> {
106 log::info!("initialize called");
107
108 let req = req.into_inner();
109
110 let db_server_addr = req.db_server_addr.as_str();
111 let db_client_conn = utils::grpc::default_client(db_server_addr)?
112 .connect()
113 .await
114 .map_err(|e| {
115 tonic::Status::unknown(format!(
116 "failed to create db client conn from: {db_server_addr}: {e}",
117 ))
118 })?;
119 let db =
120 corruptabledb::Database::new_boxed(DatabaseClient::new_boxed(db_client_conn.clone()));
121
122 let server_addr = req.server_addr.as_str();
123 let client_conn = utils::grpc::default_client(server_addr)?
124 .connect()
125 .await
126 .map_err(|e| {
127 tonic::Status::unknown(format!(
128 "failed to create client conn from: {server_addr}: {e}",
129 ))
130 })?;
131
132 let mut message = MessengerClient::new(client_conn.clone());
135 let keystore = KeystoreClient::new(client_conn.clone());
136 let shared_memory = SharedMemoryClient::new(client_conn.clone());
137 let bc_lookup = AliasReaderClient::new(client_conn.clone());
138
139 let ctx: Option<Context<ValidatorStateClient>> = Some(Context {
140 network_id: req.network_id,
141 subnet_id: ids::Id::from_slice(&req.subnet_id),
142 chain_id: ids::Id::from_slice(&req.chain_id),
143 node_id: ids::node::Id::from_slice(&req.node_id),
144 x_chain_id: ids::Id::from_slice(&req.x_chain_id),
145 c_chain_id: ids::Id::from_slice(&req.c_chain_id),
146 avax_asset_id: ids::Id::from_slice(&req.avax_asset_id),
147 keystore,
148 shared_memory,
149 bc_lookup,
150 chain_data_dir: req.chain_data_dir,
151 validator_state: ValidatorStateClient::new(client_conn.clone()),
152 });
153
154 let (tx_engine, mut rx_engine): (mpsc::Sender<Message>, mpsc::Receiver<Message>) =
155 mpsc::channel(100);
156 tokio::spawn(async move {
157 loop {
158 if let Some(msg) = rx_engine.recv().await {
159 log::debug!("message received: {msg:?}");
160 let _ = message
161 .notify(NotifyRequest {
162 message: msg as i32,
163 })
164 .await
165 .map_err(|s| tonic::Status::unknown(s.to_string()));
166 continue;
167 }
168
169 log::error!("engine receiver closed unexpectedly");
170 return tonic::Status::unknown("engine receiver closed unexpectedly");
171 }
172 });
173
174 let mut inner_vm = self.vm.write().await;
175 inner_vm
176 .initialize(
177 ctx,
178 db,
179 &req.genesis_bytes,
180 &req.upgrade_bytes,
181 &req.config_bytes,
182 tx_engine,
183 &[()],
184 AppSenderClient::new(client_conn.clone()),
185 )
186 .await
187 .map_err(|e| tonic::Status::unknown(e.to_string()))?;
188
189 let last_accepted = inner_vm.last_accepted().await?;
191
192 let last_accepted_block = inner_vm
193 .get_block(last_accepted)
194 .await
195 .map_err(|e| tonic::Status::unknown(e.to_string()))?;
196
197 log::debug!("last_accepted_block id: {last_accepted:?}");
198
199 Ok(Response::new(vm::InitializeResponse {
200 last_accepted_id: Bytes::from(last_accepted.to_vec()),
201 last_accepted_parent_id: Bytes::from(last_accepted_block.parent().await.to_vec()),
202 bytes: Bytes::from(last_accepted_block.bytes().await.to_vec()),
203 height: last_accepted_block.height().await,
204 timestamp: Some(timestamp_from_time(
205 &Utc.timestamp_opt(last_accepted_block.timestamp().await as i64, 0)
206 .unwrap(),
207 )),
208 }))
209 }
210
211 async fn shutdown(
212 &self,
213 _req: Request<Empty>,
214 ) -> std::result::Result<Response<Empty>, tonic::Status> {
215 log::debug!("shutdown called");
216
217 self.stop_ch
219 .send(())
220 .map_err(|e| tonic::Status::unknown(e.to_string()))?;
221
222 Ok(Response::new(Empty {}))
223 }
224
225 async fn create_handlers(
241 &self,
242 _req: Request<Empty>,
243 ) -> std::result::Result<Response<vm::CreateHandlersResponse>, tonic::Status> {
244 log::debug!("create_handlers called");
245
246 let mut inner_vm = self.vm.write().await;
248 let handlers = inner_vm
249 .create_handlers()
250 .await
251 .map_err(|e| tonic::Status::unknown(format!("failed to create handlers: {e}")))?;
252
253 let mut resp_handlers: Vec<vm::Handler> = Vec::with_capacity(handlers.keys().len());
255 for (prefix, http_handler) in handlers {
256 let server_addr = utils::new_socket_addr();
257 let server = grpc::Server::new(server_addr, self.stop_ch.subscribe());
258
259 server
260 .serve(pb::http::http_server::HttpServer::new(HttpServer::new(
261 http_handler.handler,
262 )))
263 .map_err(|e| {
264 tonic::Status::unknown(format!("failed to create http service: {e}"))
265 })?;
266
267 let resp_handler = vm::Handler {
268 prefix,
269 server_addr: server_addr.to_string(),
270 };
271 resp_handlers.push(resp_handler);
272 }
273
274 Ok(Response::new(vm::CreateHandlersResponse {
275 handlers: resp_handlers,
276 }))
277 }
278
279 async fn build_block(
280 &self,
281 _req: Request<vm::BuildBlockRequest>,
282 ) -> std::result::Result<Response<vm::BuildBlockResponse>, tonic::Status> {
283 log::debug!("build_block called");
284
285 let inner_vm = self.vm.write().await;
286 let block = inner_vm
287 .build_block()
288 .await
289 .map_err(|e| tonic::Status::unknown(e.to_string()))?;
290
291 Ok(Response::new(vm::BuildBlockResponse {
292 id: Bytes::from(block.id().await.to_vec()),
293 parent_id: Bytes::from(block.parent().await.to_vec()),
294 bytes: Bytes::from(block.bytes().await.to_vec()),
295 height: block.height().await,
296 timestamp: Some(timestamp_from_time(
297 &Utc.timestamp_opt(block.timestamp().await as i64, 0)
298 .unwrap(),
299 )),
300 verify_with_context: false,
301 }))
302 }
303
304 async fn parse_block(
305 &self,
306 req: Request<vm::ParseBlockRequest>,
307 ) -> std::result::Result<Response<vm::ParseBlockResponse>, tonic::Status> {
308 log::debug!("parse_block called");
309
310 let req = req.into_inner();
311 let inner_vm = self.vm.write().await;
312 let block = inner_vm
313 .parse_block(req.bytes.as_ref())
314 .await
315 .map_err(|e| tonic::Status::unknown(e.to_string()))?;
316
317 Ok(Response::new(vm::ParseBlockResponse {
318 id: Bytes::from(block.id().await.to_vec()),
319 parent_id: Bytes::from(block.parent().await.to_vec()),
320 status: block.status().await.to_i32(),
321 height: block.height().await,
322 timestamp: Some(timestamp_from_time(
323 &Utc.timestamp_opt(block.timestamp().await as i64, 0)
324 .unwrap(),
325 )),
326 verify_with_context: false,
327 }))
328 }
329
330 async fn get_block(
343 &self,
344 req: Request<vm::GetBlockRequest>,
345 ) -> std::result::Result<Response<vm::GetBlockResponse>, tonic::Status> {
346 log::debug!("get_block called");
347
348 let req = req.into_inner();
349 let inner_vm = self.vm.read().await;
350
351 match inner_vm.get_block(ids::Id::from_slice(&req.id)).await {
353 Ok(block) => Ok(Response::new(vm::GetBlockResponse {
354 parent_id: Bytes::from(block.parent().await.to_vec()),
355 bytes: Bytes::from(block.bytes().await.to_vec()),
356 status: block.status().await.to_i32(),
357 height: block.height().await,
358 timestamp: Some(timestamp_from_time(
359 &Utc.timestamp_opt(block.timestamp().await as i64, 0)
360 .unwrap(),
361 )),
362 err: 0, verify_with_context: false,
364 })),
365 Err(e) => {
368 log::debug!("Error getting block");
369 Ok(Response::new(vm::GetBlockResponse {
370 parent_id: Bytes::new(),
371 bytes: Bytes::new(),
372 status: 0,
373 height: 0,
374 timestamp: Some(timestamp_from_time(&Utc.timestamp_opt(0, 0).unwrap())),
375 err: error_to_error_code(&e.to_string()),
376 verify_with_context: false,
377 }))
378 }
379 }
380 }
381
382 async fn set_state(
383 &self,
384 req: Request<vm::SetStateRequest>,
385 ) -> std::result::Result<Response<vm::SetStateResponse>, tonic::Status> {
386 log::debug!("set_state called");
387
388 let req = req.into_inner();
389 let inner_vm = self.vm.write().await;
390 let state = State::try_from(req.state)
391 .map_err(|_| tonic::Status::unknown("failed to convert to vm state"))?;
392
393 inner_vm
394 .set_state(state)
395 .await
396 .map_err(|e| tonic::Status::unknown(e.to_string()))?;
397
398 let last_accepted_id = inner_vm
399 .last_accepted()
400 .await
401 .map_err(|e| tonic::Status::unknown(e.to_string()))?;
402
403 let block = inner_vm
404 .get_block(last_accepted_id)
405 .await
406 .map_err(|e| tonic::Status::unknown(e.to_string()))?;
407
408 Ok(Response::new(vm::SetStateResponse {
409 last_accepted_id: Bytes::from(last_accepted_id.to_vec()),
410 last_accepted_parent_id: Bytes::from(block.parent().await.to_vec()),
411 height: block.height().await,
412 bytes: Bytes::from(block.bytes().await.to_vec()),
413 timestamp: Some(timestamp_from_time(
414 &Utc.timestamp_opt(block.timestamp().await as i64, 0)
415 .unwrap(),
416 )),
417 }))
418 }
419
420 async fn set_preference(
421 &self,
422 req: Request<vm::SetPreferenceRequest>,
423 ) -> std::result::Result<Response<Empty>, tonic::Status> {
424 log::debug!("set_preference called");
425
426 let req = req.into_inner();
427 let inner_vm = self.vm.read().await;
428 inner_vm
429 .set_preference(ids::Id::from_slice(&req.id))
430 .await
431 .map_err(|e| tonic::Status::unknown(e.to_string()))?;
432
433 Ok(Response::new(Empty {}))
434 }
435
436 async fn health(
437 &self,
438 _req: Request<Empty>,
439 ) -> std::result::Result<Response<vm::HealthResponse>, tonic::Status> {
440 log::debug!("health called");
441
442 let inner_vm = self.vm.read().await;
443 let resp = inner_vm
444 .health_check()
445 .await
446 .map_err(|e| tonic::Status::unknown(e.to_string()))?;
447
448 Ok(Response::new(vm::HealthResponse {
449 details: Bytes::from(resp),
450 }))
451 }
452
453 async fn version(
454 &self,
455 _req: Request<Empty>,
456 ) -> std::result::Result<Response<vm::VersionResponse>, tonic::Status> {
457 log::debug!("version called");
458
459 let inner_vm = self.vm.read().await;
460 let version = inner_vm
461 .version()
462 .await
463 .map_err(|e| tonic::Status::unknown(e.to_string()))?;
464
465 Ok(Response::new(vm::VersionResponse { version }))
466 }
467
468 async fn connected(
469 &self,
470 req: Request<vm::ConnectedRequest>,
471 ) -> std::result::Result<Response<Empty>, tonic::Status> {
472 log::debug!("connected called");
473
474 let req = req.into_inner();
475 let inner_vm = self.vm.read().await;
476 let node_id = ids::node::Id::from_slice(&req.node_id);
477 inner_vm
478 .connected(&node_id)
479 .await
480 .map_err(|e| tonic::Status::unknown(e.to_string()))?;
481
482 Ok(Response::new(Empty {}))
483 }
484
485 async fn disconnected(
486 &self,
487 req: Request<vm::DisconnectedRequest>,
488 ) -> std::result::Result<Response<Empty>, tonic::Status> {
489 log::debug!("disconnected called");
490
491 let req = req.into_inner();
492 let inner_vm = self.vm.read().await;
493 let node_id = ids::node::Id::from_slice(&req.node_id);
494
495 inner_vm
496 .disconnected(&node_id)
497 .await
498 .map_err(|e| tonic::Status::unknown(e.to_string()))?;
499
500 Ok(Response::new(Empty {}))
501 }
502
503 async fn app_request(
504 &self,
505 req: Request<vm::AppRequestMsg>,
506 ) -> std::result::Result<Response<Empty>, tonic::Status> {
507 log::debug!("app_request called");
508
509 let req = req.into_inner();
510 let node_id = ids::node::Id::from_slice(&req.node_id);
511 let inner_vm = self.vm.read().await;
512
513 let ts = req.deadline.as_ref().expect("timestamp");
514 let deadline = Utc.timestamp_opt(ts.seconds, ts.nanos as u32).unwrap();
515
516 inner_vm
517 .app_request(&node_id, req.request_id, deadline, &req.request)
518 .await
519 .map_err(|e| tonic::Status::unknown(e.to_string()))?;
520
521 Ok(Response::new(Empty {}))
522 }
523
524 async fn app_request_failed(
525 &self,
526 req: Request<vm::AppRequestFailedMsg>,
527 ) -> std::result::Result<Response<Empty>, tonic::Status> {
528 log::debug!("app_request_failed called");
529
530 let req = req.into_inner();
531 let node_id = ids::node::Id::from_slice(&req.node_id);
532 let inner_vm = self.vm.read().await;
533
534 inner_vm
535 .app_request_failed(&node_id, req.request_id)
536 .await
537 .map_err(|e| tonic::Status::unknown(e.to_string()))?;
538
539 Ok(Response::new(Empty {}))
540 }
541
542 async fn app_response(
543 &self,
544 req: Request<vm::AppResponseMsg>,
545 ) -> std::result::Result<Response<Empty>, tonic::Status> {
546 log::debug!("app_response called");
547
548 let req = req.into_inner();
549 let node_id = ids::node::Id::from_slice(&req.node_id);
550 let inner_vm = self.vm.read().await;
551
552 inner_vm
553 .app_response(&node_id, req.request_id, &req.response)
554 .await
555 .map_err(|e| tonic::Status::unknown(e.to_string()))?;
556
557 Ok(Response::new(Empty {}))
558 }
559
560 async fn app_gossip(
561 &self,
562 req: Request<vm::AppGossipMsg>,
563 ) -> std::result::Result<Response<Empty>, tonic::Status> {
564 log::debug!("app_gossip called");
565
566 let req = req.into_inner();
567 let node_id = ids::node::Id::from_slice(&req.node_id);
568 let inner_vm = self.vm.read().await;
569
570 inner_vm
571 .app_gossip(&node_id, &req.msg)
572 .await
573 .map_err(|e| tonic::Status::unknown(e.to_string()))?;
574
575 Ok(Response::new(Empty {}))
576 }
577
578 async fn block_verify(
579 &self,
580 req: Request<vm::BlockVerifyRequest>,
581 ) -> std::result::Result<Response<vm::BlockVerifyResponse>, tonic::Status> {
582 log::debug!("block_verify called");
583
584 let req = req.into_inner();
585 let inner_vm = self.vm.read().await;
586
587 let mut block = inner_vm
588 .parse_block(&req.bytes)
589 .await
590 .map_err(|e| tonic::Status::unknown(e.to_string()))?;
591
592 block
593 .verify()
594 .await
595 .map_err(|e| tonic::Status::unknown(e.to_string()))?;
596
597 Ok(Response::new(vm::BlockVerifyResponse {
598 timestamp: Some(timestamp_from_time(
599 &Utc.timestamp_opt(block.timestamp().await as i64, 0)
600 .unwrap(),
601 )),
602 }))
603 }
604
605 async fn block_accept(
606 &self,
607 req: Request<vm::BlockAcceptRequest>,
608 ) -> std::result::Result<Response<Empty>, tonic::Status> {
609 log::debug!("block_accept called");
610
611 let req = req.into_inner();
612 let inner_vm = self.vm.read().await;
613 let id = ids::Id::from_slice(&req.id);
614
615 let mut block = inner_vm
616 .get_block(id)
617 .await
618 .map_err(|e| tonic::Status::unknown(e.to_string()))?;
619
620 block
621 .accept()
622 .await
623 .map_err(|e| tonic::Status::unknown(e.to_string()))?;
624
625 Ok(Response::new(Empty {}))
626 }
627 async fn block_reject(
628 &self,
629 req: Request<vm::BlockRejectRequest>,
630 ) -> std::result::Result<Response<Empty>, tonic::Status> {
631 log::debug!("block_reject called");
632
633 let req = req.into_inner();
634 let inner_vm = self.vm.read().await;
635 let id = ids::Id::from_slice(&req.id);
636
637 let mut block = inner_vm
638 .get_block(id)
639 .await
640 .map_err(|e| tonic::Status::unknown(e.to_string()))?;
641
642 block
643 .reject()
644 .await
645 .map_err(|e| tonic::Status::unknown(e.to_string()))?;
646
647 Ok(Response::new(Empty {}))
648 }
649
650 async fn get_ancestors(
651 &self,
652 req: Request<vm::GetAncestorsRequest>,
653 ) -> std::result::Result<Response<vm::GetAncestorsResponse>, tonic::Status> {
654 log::debug!("get_ancestors called");
655 let req = req.into_inner();
656
657 let block_id = ids::Id::from_slice(req.blk_id.as_ref());
658 let max_blocks_size = usize::try_from(req.max_blocks_size).expect("cast from i32");
659 let max_blocks_num = usize::try_from(req.max_blocks_num).expect("cast from i32");
660 let max_blocks_retrival_time = Duration::from_secs(
661 req.max_blocks_retrival_time
662 .try_into()
663 .expect("valid timestamp"),
664 );
665
666 let ancestors = self
667 .vm_ancestors(
668 req.blk_id.as_ref(),
669 req.max_blocks_num,
670 req.max_blocks_size,
671 max_blocks_retrival_time,
672 )
673 .await
674 .map(|blks_bytes| Response::new(vm::GetAncestorsResponse { blks_bytes }));
675
676 let e = match ancestors {
677 Ok(ancestors) => return Ok(ancestors),
678 Err(e) => e,
679 };
680
681 if e.kind() != std::io::ErrorKind::Unsupported {
682 return Err(tonic::Status::unknown(e.to_string()));
683 }
684
685 let start = Instant::now();
687 let mut block = match self.vm.read().await.get_block(block_id).await {
688 Ok(b) => b,
689 Err(e) => {
690 return if errors::is_not_found(&e) {
694 log::debug!("get_ancestors local get_block returned: not found");
695
696 Ok(Response::new(vm::GetAncestorsResponse {
697 blks_bytes: Vec::new(),
698 }))
699 } else {
700 Err(e.into())
701 };
702 }
703 };
704
705 let mut ancestors = Vec::with_capacity(max_blocks_num);
706 let block_bytes = block.bytes().await;
707
708 let mut ancestors_bytes_len = block_bytes.len() + U32_LEN;
710 ancestors.push(Bytes::from(block_bytes.to_owned()));
711
712 while ancestors.len() < max_blocks_num {
713 if start.elapsed() < max_blocks_retrival_time {
714 log::debug!("get_ancestors exceeded max block retrival time");
715 break;
716 }
717
718 let parent_id = block.parent().await;
719
720 block = match self.vm.read().await.get_block(parent_id).await {
721 Ok(b) => b,
722 Err(e) => {
723 if errors::is_not_found(&e) {
724 log::debug!("failed to get block during ancestors lookup parentId: {parent_id}: {e}");
726 }
727
728 break;
729 }
730 };
731
732 let block_bytes = block.bytes().await;
733
734 ancestors_bytes_len += block_bytes.len() + U32_LEN;
738
739 if ancestors_bytes_len > max_blocks_size {
740 log::debug!("get_ancestors reached maximum response size: {ancestors_bytes_len}");
741 break;
742 }
743
744 ancestors.push(Bytes::from(block_bytes.to_owned()));
745 }
746
747 Ok(Response::new(vm::GetAncestorsResponse {
748 blks_bytes: ancestors,
749 }))
750 }
751
752 async fn batched_parse_block(
753 &self,
754 req: Request<vm::BatchedParseBlockRequest>,
755 ) -> std::result::Result<Response<vm::BatchedParseBlockResponse>, tonic::Status> {
756 log::debug!("batched_parse_block called");
757 let req = req.into_inner();
758
759 let to_parse = req
760 .request
761 .into_iter()
762 .map(|bytes| Request::new(vm::ParseBlockRequest { bytes }))
763 .map(|request| async {
764 self.parse_block(request)
765 .await
766 .map(|block| block.into_inner())
767 });
768 let blocks = futures::future::try_join_all(to_parse).await?;
769
770 Ok(Response::new(vm::BatchedParseBlockResponse {
771 response: blocks,
772 }))
773 }
774
775 #[cfg(not(feature = "subnet_metrics"))]
776 async fn gather(
777 &self,
778 _req: Request<Empty>,
779 ) -> std::result::Result<Response<vm::GatherResponse>, tonic::Status> {
780 log::debug!("gather called");
781
782 let metric_families =
783 vec![crate::proto::pb::io::prometheus::client::MetricFamily::default()];
784
785 Ok(Response::new(vm::GatherResponse { metric_families }))
786 }
787
788 #[cfg(feature = "subnet_metrics")]
789 #[cfg_attr(docsrs, doc(cfg(feature = "subnet_metrics")))]
790 async fn gather(
791 &self,
792 _req: Request<Empty>,
793 ) -> std::result::Result<Response<vm::GatherResponse>, tonic::Status> {
794 log::debug!("gather called");
795
796 let metric_families = crate::subnet::rpc::metrics::MetricsFamilies::from(
798 &self.process_metrics.read().await.gather(),
799 )
800 .mfs;
801
802 Ok(Response::new(vm::GatherResponse { metric_families }))
803 }
804
805 async fn cross_chain_app_request(
806 &self,
807 req: Request<vm::CrossChainAppRequestMsg>,
808 ) -> std::result::Result<Response<Empty>, tonic::Status> {
809 log::debug!("cross_chain_app_request called");
810 let msg = req.into_inner();
811 let chain_id = &ids::Id::from_slice(&msg.chain_id);
812
813 let ts = msg.deadline.as_ref().expect("timestamp");
814 let deadline = Utc.timestamp_opt(ts.seconds, ts.nanos as u32).unwrap();
815
816 let inner_vm = self.vm.read().await;
817 inner_vm
818 .cross_chain_app_request(chain_id, msg.request_id, deadline, &msg.request)
819 .await
820 .map_err(|e| tonic::Status::unknown(e.to_string()))?;
821
822 Ok(Response::new(Empty {}))
823 }
824
825 async fn cross_chain_app_request_failed(
826 &self,
827 req: Request<vm::CrossChainAppRequestFailedMsg>,
828 ) -> std::result::Result<Response<Empty>, tonic::Status> {
829 log::debug!("cross_chain_app_request_failed called");
830 let msg = req.into_inner();
831 let chain_id = &ids::Id::from_slice(&msg.chain_id);
832
833 let inner_vm = self.vm.read().await;
834 inner_vm
835 .cross_chain_app_request_failed(chain_id, msg.request_id)
836 .await
837 .map_err(|e| tonic::Status::unknown(e.to_string()))?;
838
839 Ok(Response::new(Empty {}))
840 }
841
842 async fn cross_chain_app_response(
843 &self,
844 req: Request<vm::CrossChainAppResponseMsg>,
845 ) -> std::result::Result<Response<Empty>, tonic::Status> {
846 log::debug!("cross_chain_app_response called");
847 let msg = req.into_inner();
848 let chain_id = &ids::Id::from_slice(&msg.chain_id);
849
850 let inner_vm = self.vm.read().await;
851 inner_vm
852 .cross_chain_app_response(chain_id, msg.request_id, &msg.response)
853 .await
854 .map_err(|e| tonic::Status::unknown(e.to_string()))?;
855
856 Ok(Response::new(Empty {}))
857 }
858
859 async fn state_sync_enabled(
860 &self,
861 _req: Request<Empty>,
862 ) -> std::result::Result<Response<vm::StateSyncEnabledResponse>, tonic::Status> {
863 log::debug!("state_sync_enabled called");
864
865 Ok(Response::new(vm::StateSyncEnabledResponse {
867 enabled: false,
868 err: 0,
869 }))
870 }
871
872 async fn get_ongoing_sync_state_summary(
873 &self,
874 _req: Request<Empty>,
875 ) -> std::result::Result<Response<vm::GetOngoingSyncStateSummaryResponse>, tonic::Status> {
876 log::debug!("get_ongoing_sync_state_summary called");
877
878 Err(tonic::Status::unimplemented(
879 "get_ongoing_sync_state_summary",
880 ))
881 }
882
883 async fn parse_state_summary(
884 &self,
885 _req: Request<vm::ParseStateSummaryRequest>,
886 ) -> std::result::Result<tonic::Response<vm::ParseStateSummaryResponse>, tonic::Status> {
887 log::debug!("parse_state_summary called");
888
889 Err(tonic::Status::unimplemented("parse_state_summary"))
890 }
891
892 async fn get_state_summary(
893 &self,
894 _req: Request<vm::GetStateSummaryRequest>,
895 ) -> std::result::Result<Response<vm::GetStateSummaryResponse>, tonic::Status> {
896 log::debug!("get_state_summary called");
897
898 Err(tonic::Status::unimplemented("get_state_summary"))
899 }
900
901 async fn get_last_state_summary(
902 &self,
903 _req: Request<Empty>,
904 ) -> std::result::Result<Response<vm::GetLastStateSummaryResponse>, tonic::Status> {
905 log::debug!("get_last_state_summary called");
906
907 Err(tonic::Status::unimplemented("get_last_state_summary"))
908 }
909
910 async fn state_summary_accept(
911 &self,
912 _req: Request<vm::StateSummaryAcceptRequest>,
913 ) -> std::result::Result<tonic::Response<vm::StateSummaryAcceptResponse>, tonic::Status> {
914 log::debug!("state_summary_accept called");
915
916 Err(tonic::Status::unimplemented("state_summary_accept"))
917 }
918
919 async fn verify_height_index(
920 &self,
921 _req: Request<Empty>,
922 ) -> std::result::Result<Response<vm::VerifyHeightIndexResponse>, tonic::Status> {
923 log::debug!("verify_height_index called");
924
925 let inner_vm = self.vm.read().await;
926
927 match inner_vm.verify_height_index().await {
928 Ok(_) => return Ok(Response::new(vm::VerifyHeightIndexResponse { err: 0 })),
929 Err(e) => {
930 if error_to_error_code(&e.to_string()) != 0 {
931 return Ok(Response::new(vm::VerifyHeightIndexResponse {
932 err: error_to_error_code(&e.to_string()),
933 }));
934 }
935 return Err(tonic::Status::unknown(e.to_string()));
936 }
937 }
938 }
939
940 async fn get_block_id_at_height(
941 &self,
942 req: Request<vm::GetBlockIdAtHeightRequest>,
943 ) -> std::result::Result<Response<vm::GetBlockIdAtHeightResponse>, tonic::Status> {
944 log::debug!("get_block_id_at_height called");
945
946 let msg = req.into_inner();
947 let inner_vm = self.vm.read().await;
948
949 match inner_vm.get_block_id_at_height(msg.height).await {
950 Ok(height) => {
951 return Ok(Response::new(vm::GetBlockIdAtHeightResponse {
952 blk_id: height.to_vec().into(),
953 err: 0,
954 }))
955 }
956 Err(e) => {
957 if error_to_error_code(&e.to_string()) != 0 {
958 return Ok(Response::new(vm::GetBlockIdAtHeightResponse {
959 blk_id: vec![].into(),
960 err: error_to_error_code(&e.to_string()),
961 }));
962 }
963 return Err(tonic::Status::unknown(e.to_string()));
964 }
965 }
966 }
967}