1#[cfg(with_metrics)]
7use std::sync::LazyLock;
8#[cfg(not(web))]
9use std::time::Duration;
10
11use custom_debug_derive::Debug;
12use futures::{channel::mpsc, StreamExt as _};
13#[cfg(with_metrics)]
14use linera_base::prometheus_util::{
15 exponential_bucket_latencies, register_histogram_vec, MeasureLatency as _,
16};
17use linera_base::{
18 data_types::{
19 Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, Timestamp,
20 },
21 ensure, hex_debug, hex_vec_debug, http,
22 identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, MessageId, StreamId},
23 ownership::ChainOwnership,
24};
25use linera_views::{batch::Batch, context::Context, views::View};
26use oneshot::Sender;
27#[cfg(with_metrics)]
28use prometheus::HistogramVec;
29use reqwest::{header::HeaderMap, Client, Url};
30
31use crate::{
32 system::{CreateApplicationResult, OpenChainConfig, Recipient},
33 util::RespondExt,
34 ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeContext,
35 ExecutionStateView, ModuleId, OutgoingMessage, ResourceController, TransactionTracker,
36 UserContractCode, UserServiceCode,
37};
38
39#[cfg(with_metrics)]
40static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
42 register_histogram_vec(
43 "load_contract_latency",
44 "Load contract latency",
45 &[],
46 exponential_bucket_latencies(250.0),
47 )
48});
49
50#[cfg(with_metrics)]
51static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
53 register_histogram_vec(
54 "load_service_latency",
55 "Load service latency",
56 &[],
57 exponential_bucket_latencies(250.0),
58 )
59});
60
61pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
62
63impl<C> ExecutionStateView<C>
64where
65 C: Context + Clone + Send + Sync + 'static,
66 C::Extra: ExecutionRuntimeContext,
67{
68 pub(crate) async fn load_contract(
69 &mut self,
70 id: ApplicationId,
71 txn_tracker: &mut TransactionTracker,
72 ) -> Result<(UserContractCode, ApplicationDescription), ExecutionError> {
73 #[cfg(with_metrics)]
74 let _latency = LOAD_CONTRACT_LATENCY.measure_latency();
75 let blob_id = id.description_blob_id();
76 let description = match txn_tracker.created_blobs().get(&blob_id) {
77 Some(description) => {
78 let blob = description.clone();
79 bcs::from_bytes(blob.bytes())?
80 }
81 None => {
82 self.system
83 .describe_application(id, Some(txn_tracker))
84 .await?
85 }
86 };
87 let code = self
88 .context()
89 .extra()
90 .get_user_contract(&description)
91 .await?;
92 Ok((code, description))
93 }
94
95 pub(crate) async fn load_service(
96 &mut self,
97 id: ApplicationId,
98 txn_tracker: Option<&mut TransactionTracker>,
99 ) -> Result<(UserServiceCode, ApplicationDescription), ExecutionError> {
100 #[cfg(with_metrics)]
101 let _latency = LOAD_SERVICE_LATENCY.measure_latency();
102 let blob_id = id.description_blob_id();
103 let description = match txn_tracker
104 .as_ref()
105 .and_then(|tracker| tracker.created_blobs().get(&blob_id))
106 {
107 Some(description) => {
108 let blob = description.clone();
109 bcs::from_bytes(blob.bytes())?
110 }
111 None => self.system.describe_application(id, txn_tracker).await?,
112 };
113 let code = self
114 .context()
115 .extra()
116 .get_user_service(&description)
117 .await?;
118 Ok((code, description))
119 }
120
121 pub(crate) async fn handle_request(
123 &mut self,
124 request: ExecutionRequest,
125 resource_controller: &mut ResourceController<Option<AccountOwner>>,
126 ) -> Result<(), ExecutionError> {
127 use ExecutionRequest::*;
128 match request {
129 #[cfg(not(web))]
130 LoadContract {
131 id,
132 callback,
133 mut txn_tracker,
134 } => {
135 let (code, description) = self.load_contract(id, &mut txn_tracker).await?;
136 callback.respond((code, description, txn_tracker))
137 }
138 #[cfg(not(web))]
139 LoadService {
140 id,
141 callback,
142 mut txn_tracker,
143 } => {
144 let (code, description) = self.load_service(id, Some(&mut txn_tracker)).await?;
145 callback.respond((code, description, txn_tracker))
146 }
147
148 ChainBalance { callback } => {
149 let balance = *self.system.balance.get();
150 callback.respond(balance);
151 }
152
153 OwnerBalance { owner, callback } => {
154 let balance = self.system.balances.get(&owner).await?.unwrap_or_default();
155 callback.respond(balance);
156 }
157
158 OwnerBalances { callback } => {
159 let balances = self.system.balances.index_values().await?;
160 callback.respond(balances.into_iter().collect());
161 }
162
163 BalanceOwners { callback } => {
164 let owners = self.system.balances.indices().await?;
165 callback.respond(owners);
166 }
167
168 Transfer {
169 source,
170 destination,
171 amount,
172 signer,
173 application_id,
174 callback,
175 } => callback.respond(
176 self.system
177 .transfer(
178 signer,
179 Some(application_id),
180 source,
181 Recipient::Account(destination),
182 amount,
183 )
184 .await?,
185 ),
186
187 Claim {
188 source,
189 destination,
190 amount,
191 signer,
192 application_id,
193 callback,
194 } => callback.respond(
195 self.system
196 .claim(
197 signer,
198 Some(application_id),
199 source.owner,
200 source.chain_id,
201 Recipient::Account(destination),
202 amount,
203 )
204 .await?,
205 ),
206
207 SystemTimestamp { callback } => {
208 let timestamp = *self.system.timestamp.get();
209 callback.respond(timestamp);
210 }
211
212 ChainOwnership { callback } => {
213 let ownership = self.system.ownership.get().clone();
214 callback.respond(ownership);
215 }
216
217 ContainsKey { id, key, callback } => {
218 let view = self.users.try_load_entry(&id).await?;
219 let result = match view {
220 Some(view) => view.contains_key(&key).await?,
221 None => false,
222 };
223 callback.respond(result);
224 }
225
226 ContainsKeys { id, keys, callback } => {
227 let view = self.users.try_load_entry(&id).await?;
228 let result = match view {
229 Some(view) => view.contains_keys(keys).await?,
230 None => vec![false; keys.len()],
231 };
232 callback.respond(result);
233 }
234
235 ReadMultiValuesBytes { id, keys, callback } => {
236 let view = self.users.try_load_entry(&id).await?;
237 let values = match view {
238 Some(view) => view.multi_get(keys).await?,
239 None => vec![None; keys.len()],
240 };
241 callback.respond(values);
242 }
243
244 ReadValueBytes { id, key, callback } => {
245 let view = self.users.try_load_entry(&id).await?;
246 let result = match view {
247 Some(view) => view.get(&key).await?,
248 None => None,
249 };
250 callback.respond(result);
251 }
252
253 FindKeysByPrefix {
254 id,
255 key_prefix,
256 callback,
257 } => {
258 let view = self.users.try_load_entry(&id).await?;
259 let result = match view {
260 Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
261 None => Vec::new(),
262 };
263 callback.respond(result);
264 }
265
266 FindKeyValuesByPrefix {
267 id,
268 key_prefix,
269 callback,
270 } => {
271 let view = self.users.try_load_entry(&id).await?;
272 let result = match view {
273 Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
274 None => Vec::new(),
275 };
276 callback.respond(result);
277 }
278
279 WriteBatch {
280 id,
281 batch,
282 callback,
283 } => {
284 let mut view = self.users.try_load_entry_mut(&id).await?;
285 view.write_batch(batch).await?;
286 callback.respond(());
287 }
288
289 OpenChain {
290 ownership,
291 balance,
292 next_message_id,
293 application_permissions,
294 callback,
295 } => {
296 let inactive_err = || ExecutionError::InactiveChain;
297 let config = OpenChainConfig {
298 ownership,
299 admin_id: self.system.admin_id.get().ok_or_else(inactive_err)?,
300 epoch: self.system.epoch.get().ok_or_else(inactive_err)?,
301 committees: self.system.committees.get().clone(),
302 balance,
303 application_permissions,
304 };
305 callback.respond(self.system.open_chain(config, next_message_id).await?);
306 }
307
308 CloseChain {
309 application_id,
310 callback,
311 } => {
312 let app_permissions = self.system.application_permissions.get();
313 if !app_permissions.can_close_chain(&application_id) {
314 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
315 } else {
316 self.system.close_chain().await?;
317 callback.respond(Ok(()));
318 }
319 }
320
321 ChangeApplicationPermissions {
322 application_id,
323 application_permissions,
324 callback,
325 } => {
326 let app_permissions = self.system.application_permissions.get();
327 if !app_permissions.can_change_application_permissions(&application_id) {
328 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
329 } else {
330 self.system
331 .application_permissions
332 .set(application_permissions);
333 callback.respond(Ok(()));
334 }
335 }
336
337 CreateApplication {
338 chain_id,
339 block_height,
340 module_id,
341 parameters,
342 required_application_ids,
343 callback,
344 txn_tracker,
345 } => {
346 let create_application_result = self
347 .system
348 .create_application(
349 chain_id,
350 block_height,
351 module_id,
352 parameters,
353 required_application_ids,
354 txn_tracker,
355 )
356 .await?;
357 callback.respond(Ok(create_application_result));
358 }
359
360 PerformHttpRequest {
361 request,
362 http_responses_are_oracle_responses,
363 callback,
364 } => {
365 let headers = request
366 .headers
367 .into_iter()
368 .map(|http::Header { name, value }| Ok((name.parse()?, value.try_into()?)))
369 .collect::<Result<HeaderMap, ExecutionError>>()?;
370
371 let url = Url::parse(&request.url)?;
372 let host = url
373 .host_str()
374 .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
375
376 let (_epoch, committee) = self
377 .system
378 .current_committee()
379 .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
380 let allowed_hosts = &committee.policy().http_request_allow_list;
381
382 ensure!(
383 allowed_hosts.contains(host),
384 ExecutionError::UnauthorizedHttpRequest(url)
385 );
386
387 #[cfg_attr(web, allow(unused_mut))]
388 let mut request = Client::new()
389 .request(request.method.into(), url)
390 .body(request.body)
391 .headers(headers);
392 #[cfg(not(web))]
393 {
394 request = request.timeout(Duration::from_millis(
395 committee.policy().http_request_timeout_ms,
396 ));
397 }
398
399 let response = request.send().await?;
400
401 let mut response_size_limit = committee.policy().maximum_http_response_bytes;
402
403 if http_responses_are_oracle_responses {
404 response_size_limit =
405 response_size_limit.min(committee.policy().maximum_oracle_response_bytes);
406 }
407
408 callback.respond(
409 self.receive_http_response(response, response_size_limit)
410 .await?,
411 );
412 }
413
414 ReadBlobContent { blob_id, callback } => {
415 let blob = self.system.read_blob_content(blob_id).await?;
416 if blob_id.blob_type == BlobType::Data {
417 resource_controller
418 .with_state(&mut self.system)
419 .await?
420 .track_blob_read(blob.bytes().len() as u64)?;
421 }
422 let is_new = self.system.blob_used(None, blob_id).await?;
423 callback.respond((blob, is_new))
424 }
425
426 AssertBlobExists { blob_id, callback } => {
427 self.system.assert_blob_exists(blob_id).await?;
428 if blob_id.blob_type == BlobType::Data {
430 resource_controller
431 .with_state(&mut self.system)
432 .await?
433 .track_blob_read(0)?;
434 }
435 callback.respond(self.system.blob_used(None, blob_id).await?)
436 }
437
438 NextEventIndex {
439 stream_id,
440 callback,
441 } => {
442 let count = self
443 .stream_event_counts
444 .get_mut_or_default(&stream_id)
445 .await?;
446 let index = *count;
447 *count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
448 callback.respond(index)
449 }
450
451 ReadEvent { event_id, callback } => {
452 let event_value = self.context().extra().get_event(event_id).await?;
453 callback.respond(event_value);
454 }
455
456 SubscribeToEvents {
457 chain_id,
458 stream_id,
459 subscriber_app_id,
460 callback,
461 } => {
462 let subscriptions = self
463 .system
464 .event_subscriptions
465 .get_mut_or_default(&(chain_id, stream_id))
466 .await?;
467 subscriptions.applications.insert(subscriber_app_id);
468 callback.respond(());
469 }
470
471 UnsubscribeFromEvents {
472 chain_id,
473 stream_id,
474 subscriber_app_id,
475 callback,
476 } => {
477 let key = (chain_id, stream_id);
478 let subscriptions = self
479 .system
480 .event_subscriptions
481 .get_mut_or_default(&key)
482 .await?;
483 subscriptions.applications.remove(&subscriber_app_id);
484 if subscriptions.applications.is_empty() {
485 self.system.event_subscriptions.remove(&key)?;
486 }
487 callback.respond(());
488 }
489
490 GetApplicationPermissions { callback } => {
491 let app_permissions = self.system.application_permissions.get();
492 callback.respond(app_permissions.clone());
493 }
494 }
495
496 Ok(())
497 }
498}
499
500impl<C> ExecutionStateView<C>
501where
502 C: Context + Clone + Send + Sync + 'static,
503 C::Extra: ExecutionRuntimeContext,
504{
505 async fn receive_http_response(
509 &mut self,
510 response: reqwest::Response,
511 size_limit: u64,
512 ) -> Result<http::Response, ExecutionError> {
513 let status = response.status().as_u16();
514 let maybe_content_length = response.content_length();
515
516 let headers = response
517 .headers()
518 .iter()
519 .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes()))
520 .collect::<Vec<_>>();
521
522 let total_header_size = headers
523 .iter()
524 .map(|header| (header.name.len() + header.value.len()) as u64)
525 .sum();
526
527 let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or(
528 ExecutionError::HttpResponseSizeLimitExceeded {
529 limit: size_limit,
530 size: total_header_size,
531 },
532 )?;
533
534 if let Some(content_length) = maybe_content_length {
535 if content_length > remaining_bytes {
536 return Err(ExecutionError::HttpResponseSizeLimitExceeded {
537 limit: size_limit,
538 size: content_length + total_header_size,
539 });
540 }
541 }
542
543 let mut body = Vec::with_capacity(maybe_content_length.unwrap_or(0) as usize);
544 let mut body_stream = response.bytes_stream();
545
546 while let Some(bytes) = body_stream.next().await.transpose()? {
547 remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or(
548 ExecutionError::HttpResponseSizeLimitExceeded {
549 limit: size_limit,
550 size: bytes.len() as u64 + (size_limit - remaining_bytes),
551 },
552 )?;
553
554 body.extend(&bytes);
555 }
556
557 Ok(http::Response {
558 status,
559 headers,
560 body,
561 })
562 }
563}
564
565#[derive(Debug)]
567pub enum ExecutionRequest {
568 #[cfg(not(web))]
569 LoadContract {
570 id: ApplicationId,
571 #[debug(skip)]
572 callback: Sender<(UserContractCode, ApplicationDescription, TransactionTracker)>,
573 #[debug(skip)]
574 txn_tracker: TransactionTracker,
575 },
576
577 #[cfg(not(web))]
578 LoadService {
579 id: ApplicationId,
580 #[debug(skip)]
581 callback: Sender<(UserServiceCode, ApplicationDescription, TransactionTracker)>,
582 #[debug(skip)]
583 txn_tracker: TransactionTracker,
584 },
585
586 ChainBalance {
587 #[debug(skip)]
588 callback: Sender<Amount>,
589 },
590
591 OwnerBalance {
592 owner: AccountOwner,
593 #[debug(skip)]
594 callback: Sender<Amount>,
595 },
596
597 OwnerBalances {
598 #[debug(skip)]
599 callback: Sender<Vec<(AccountOwner, Amount)>>,
600 },
601
602 BalanceOwners {
603 #[debug(skip)]
604 callback: Sender<Vec<AccountOwner>>,
605 },
606
607 Transfer {
608 source: AccountOwner,
609 destination: Account,
610 amount: Amount,
611 #[debug(skip_if = Option::is_none)]
612 signer: Option<AccountOwner>,
613 application_id: ApplicationId,
614 #[debug(skip)]
615 callback: Sender<Option<OutgoingMessage>>,
616 },
617
618 Claim {
619 source: Account,
620 destination: Account,
621 amount: Amount,
622 #[debug(skip_if = Option::is_none)]
623 signer: Option<AccountOwner>,
624 application_id: ApplicationId,
625 #[debug(skip)]
626 callback: Sender<OutgoingMessage>,
627 },
628
629 SystemTimestamp {
630 #[debug(skip)]
631 callback: Sender<Timestamp>,
632 },
633
634 ChainOwnership {
635 #[debug(skip)]
636 callback: Sender<ChainOwnership>,
637 },
638
639 ReadValueBytes {
640 id: ApplicationId,
641 #[debug(with = hex_debug)]
642 key: Vec<u8>,
643 #[debug(skip)]
644 callback: Sender<Option<Vec<u8>>>,
645 },
646
647 ContainsKey {
648 id: ApplicationId,
649 key: Vec<u8>,
650 #[debug(skip)]
651 callback: Sender<bool>,
652 },
653
654 ContainsKeys {
655 id: ApplicationId,
656 #[debug(with = hex_vec_debug)]
657 keys: Vec<Vec<u8>>,
658 callback: Sender<Vec<bool>>,
659 },
660
661 ReadMultiValuesBytes {
662 id: ApplicationId,
663 #[debug(with = hex_vec_debug)]
664 keys: Vec<Vec<u8>>,
665 #[debug(skip)]
666 callback: Sender<Vec<Option<Vec<u8>>>>,
667 },
668
669 FindKeysByPrefix {
670 id: ApplicationId,
671 #[debug(with = hex_debug)]
672 key_prefix: Vec<u8>,
673 #[debug(skip)]
674 callback: Sender<Vec<Vec<u8>>>,
675 },
676
677 FindKeyValuesByPrefix {
678 id: ApplicationId,
679 #[debug(with = hex_debug)]
680 key_prefix: Vec<u8>,
681 #[debug(skip)]
682 callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
683 },
684
685 WriteBatch {
686 id: ApplicationId,
687 batch: Batch,
688 #[debug(skip)]
689 callback: Sender<()>,
690 },
691
692 OpenChain {
693 ownership: ChainOwnership,
694 #[debug(skip_if = Amount::is_zero)]
695 balance: Amount,
696 next_message_id: MessageId,
697 application_permissions: ApplicationPermissions,
698 #[debug(skip)]
699 callback: Sender<OutgoingMessage>,
700 },
701
702 CloseChain {
703 application_id: ApplicationId,
704 #[debug(skip)]
705 callback: Sender<Result<(), ExecutionError>>,
706 },
707
708 ChangeApplicationPermissions {
709 application_id: ApplicationId,
710 application_permissions: ApplicationPermissions,
711 #[debug(skip)]
712 callback: Sender<Result<(), ExecutionError>>,
713 },
714
715 CreateApplication {
716 chain_id: ChainId,
717 block_height: BlockHeight,
718 module_id: ModuleId,
719 parameters: Vec<u8>,
720 required_application_ids: Vec<ApplicationId>,
721 #[debug(skip)]
722 txn_tracker: TransactionTracker,
723 #[debug(skip)]
724 callback: Sender<Result<CreateApplicationResult, ExecutionError>>,
725 },
726
727 PerformHttpRequest {
728 request: http::Request,
729 http_responses_are_oracle_responses: bool,
730 #[debug(skip)]
731 callback: Sender<http::Response>,
732 },
733
734 ReadBlobContent {
735 blob_id: BlobId,
736 #[debug(skip)]
737 callback: Sender<(BlobContent, bool)>,
738 },
739
740 AssertBlobExists {
741 blob_id: BlobId,
742 #[debug(skip)]
743 callback: Sender<bool>,
744 },
745
746 NextEventIndex {
747 stream_id: StreamId,
748 #[debug(skip)]
749 callback: Sender<u32>,
750 },
751
752 ReadEvent {
753 event_id: EventId,
754 callback: oneshot::Sender<Vec<u8>>,
755 },
756
757 SubscribeToEvents {
758 chain_id: ChainId,
759 stream_id: StreamId,
760 subscriber_app_id: ApplicationId,
761 #[debug(skip)]
762 callback: Sender<()>,
763 },
764
765 UnsubscribeFromEvents {
766 chain_id: ChainId,
767 stream_id: StreamId,
768 subscriber_app_id: ApplicationId,
769 #[debug(skip)]
770 callback: Sender<()>,
771 },
772
773 GetApplicationPermissions {
774 #[debug(skip)]
775 callback: Sender<ApplicationPermissions>,
776 },
777}