1use custom_debug_derive::Debug;
7use futures::{channel::mpsc, StreamExt as _};
8#[cfg(with_metrics)]
9use linera_base::prometheus_util::MeasureLatency as _;
10use linera_base::{
11 data_types::{
12 Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, Timestamp,
13 },
14 ensure, hex_debug, hex_vec_debug, http,
15 identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, StreamId},
16 ownership::ChainOwnership,
17};
18use linera_views::{batch::Batch, context::Context, views::View};
19use oneshot::Sender;
20use reqwest::{header::HeaderMap, Client, Url};
21
22use crate::{
23 system::{CreateApplicationResult, OpenChainConfig},
24 util::RespondExt,
25 ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeContext,
26 ExecutionStateView, ModuleId, OutgoingMessage, ResourceController, TransactionTracker,
27 UserContractCode, UserServiceCode,
28};
29
30#[cfg(with_metrics)]
31mod metrics {
32 use std::sync::LazyLock;
33
34 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
35 use prometheus::HistogramVec;
36
37 pub static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
39 register_histogram_vec(
40 "load_contract_latency",
41 "Load contract latency",
42 &[],
43 exponential_bucket_latencies(250.0),
44 )
45 });
46
47 pub static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
49 register_histogram_vec(
50 "load_service_latency",
51 "Load service latency",
52 &[],
53 exponential_bucket_latencies(250.0),
54 )
55 });
56}
57
58pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
59
60impl<C> ExecutionStateView<C>
61where
62 C: Context + Clone + Send + Sync + 'static,
63 C::Extra: ExecutionRuntimeContext,
64{
65 pub(crate) async fn load_contract(
66 &mut self,
67 id: ApplicationId,
68 txn_tracker: &mut TransactionTracker,
69 ) -> Result<(UserContractCode, ApplicationDescription), ExecutionError> {
70 #[cfg(with_metrics)]
71 let _latency = metrics::LOAD_CONTRACT_LATENCY.measure_latency();
72 let blob_id = id.description_blob_id();
73 let description = match txn_tracker.get_blob_content(&blob_id) {
74 Some(blob) => bcs::from_bytes(blob.bytes())?,
75 None => self.system.describe_application(id, txn_tracker).await?,
76 };
77 let code = self
78 .context()
79 .extra()
80 .get_user_contract(&description, txn_tracker)
81 .await?;
82 Ok((code, description))
83 }
84
85 pub(crate) async fn load_service(
86 &mut self,
87 id: ApplicationId,
88 txn_tracker: &mut TransactionTracker,
89 ) -> Result<(UserServiceCode, ApplicationDescription), ExecutionError> {
90 #[cfg(with_metrics)]
91 let _latency = metrics::LOAD_SERVICE_LATENCY.measure_latency();
92 let blob_id = id.description_blob_id();
93 let description = match txn_tracker.get_blob_content(&blob_id) {
94 Some(blob) => bcs::from_bytes(blob.bytes())?,
95 None => self.system.describe_application(id, txn_tracker).await?,
96 };
97 let code = self
98 .context()
99 .extra()
100 .get_user_service(&description, txn_tracker)
101 .await?;
102 Ok((code, description))
103 }
104
105 pub(crate) async fn handle_request(
107 &mut self,
108 request: ExecutionRequest,
109 resource_controller: &mut ResourceController<Option<AccountOwner>>,
110 ) -> Result<(), ExecutionError> {
111 use ExecutionRequest::*;
112 match request {
113 #[cfg(not(web))]
114 LoadContract {
115 id,
116 callback,
117 mut txn_tracker,
118 } => {
119 let (code, description) = self.load_contract(id, &mut txn_tracker).await?;
120 callback.respond((code, description, txn_tracker))
121 }
122 #[cfg(not(web))]
123 LoadService {
124 id,
125 callback,
126 mut txn_tracker,
127 } => {
128 let (code, description) = self.load_service(id, &mut txn_tracker).await?;
129 callback.respond((code, description, txn_tracker))
130 }
131
132 ChainBalance { callback } => {
133 let balance = *self.system.balance.get();
134 callback.respond(balance);
135 }
136
137 OwnerBalance { owner, callback } => {
138 let balance = self.system.balances.get(&owner).await?.unwrap_or_default();
139 callback.respond(balance);
140 }
141
142 OwnerBalances { callback } => {
143 let balances = self.system.balances.index_values().await?;
144 callback.respond(balances.into_iter().collect());
145 }
146
147 BalanceOwners { callback } => {
148 let owners = self.system.balances.indices().await?;
149 callback.respond(owners);
150 }
151
152 Transfer {
153 source,
154 destination,
155 amount,
156 signer,
157 application_id,
158 callback,
159 } => callback.respond(
160 self.system
161 .transfer(signer, Some(application_id), source, destination, amount)
162 .await?,
163 ),
164
165 Claim {
166 source,
167 destination,
168 amount,
169 signer,
170 application_id,
171 callback,
172 } => callback.respond(
173 self.system
174 .claim(
175 signer,
176 Some(application_id),
177 source.owner,
178 source.chain_id,
179 destination,
180 amount,
181 )
182 .await?,
183 ),
184
185 SystemTimestamp { callback } => {
186 let timestamp = *self.system.timestamp.get();
187 callback.respond(timestamp);
188 }
189
190 ChainOwnership { callback } => {
191 let ownership = self.system.ownership.get().clone();
192 callback.respond(ownership);
193 }
194
195 ContainsKey { id, key, callback } => {
196 let view = self.users.try_load_entry(&id).await?;
197 let result = match view {
198 Some(view) => view.contains_key(&key).await?,
199 None => false,
200 };
201 callback.respond(result);
202 }
203
204 ContainsKeys { id, keys, callback } => {
205 let view = self.users.try_load_entry(&id).await?;
206 let result = match view {
207 Some(view) => view.contains_keys(keys).await?,
208 None => vec![false; keys.len()],
209 };
210 callback.respond(result);
211 }
212
213 ReadMultiValuesBytes { id, keys, callback } => {
214 let view = self.users.try_load_entry(&id).await?;
215 let values = match view {
216 Some(view) => view.multi_get(keys).await?,
217 None => vec![None; keys.len()],
218 };
219 callback.respond(values);
220 }
221
222 ReadValueBytes { id, key, callback } => {
223 let view = self.users.try_load_entry(&id).await?;
224 let result = match view {
225 Some(view) => view.get(&key).await?,
226 None => None,
227 };
228 callback.respond(result);
229 }
230
231 FindKeysByPrefix {
232 id,
233 key_prefix,
234 callback,
235 } => {
236 let view = self.users.try_load_entry(&id).await?;
237 let result = match view {
238 Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
239 None => Vec::new(),
240 };
241 callback.respond(result);
242 }
243
244 FindKeyValuesByPrefix {
245 id,
246 key_prefix,
247 callback,
248 } => {
249 let view = self.users.try_load_entry(&id).await?;
250 let result = match view {
251 Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
252 None => Vec::new(),
253 };
254 callback.respond(result);
255 }
256
257 WriteBatch {
258 id,
259 batch,
260 callback,
261 } => {
262 let mut view = self.users.try_load_entry_mut(&id).await?;
263 view.write_batch(batch).await?;
264 callback.respond(());
265 }
266
267 OpenChain {
268 ownership,
269 balance,
270 parent_id,
271 block_height,
272 application_permissions,
273 timestamp,
274 callback,
275 mut txn_tracker,
276 } => {
277 let config = OpenChainConfig {
278 ownership,
279 balance,
280 application_permissions,
281 };
282 let chain_id = self
283 .system
284 .open_chain(config, parent_id, block_height, timestamp, &mut txn_tracker)
285 .await?;
286 callback.respond((chain_id, txn_tracker));
287 }
288
289 CloseChain {
290 application_id,
291 callback,
292 } => {
293 let app_permissions = self.system.application_permissions.get();
294 if !app_permissions.can_close_chain(&application_id) {
295 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
296 } else {
297 self.system.close_chain().await?;
298 callback.respond(Ok(()));
299 }
300 }
301
302 ChangeApplicationPermissions {
303 application_id,
304 application_permissions,
305 callback,
306 } => {
307 let app_permissions = self.system.application_permissions.get();
308 if !app_permissions.can_change_application_permissions(&application_id) {
309 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
310 } else {
311 self.system
312 .application_permissions
313 .set(application_permissions);
314 callback.respond(Ok(()));
315 }
316 }
317
318 CreateApplication {
319 chain_id,
320 block_height,
321 module_id,
322 parameters,
323 required_application_ids,
324 callback,
325 txn_tracker,
326 } => {
327 let create_application_result = self
328 .system
329 .create_application(
330 chain_id,
331 block_height,
332 module_id,
333 parameters,
334 required_application_ids,
335 txn_tracker,
336 )
337 .await?;
338 callback.respond(Ok(create_application_result));
339 }
340
341 PerformHttpRequest {
342 request,
343 http_responses_are_oracle_responses,
344 callback,
345 } => {
346 let headers = request
347 .headers
348 .into_iter()
349 .map(|http::Header { name, value }| Ok((name.parse()?, value.try_into()?)))
350 .collect::<Result<HeaderMap, ExecutionError>>()?;
351
352 let url = Url::parse(&request.url)?;
353 let host = url
354 .host_str()
355 .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
356
357 let (_epoch, committee) = self
358 .system
359 .current_committee()
360 .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
361 let allowed_hosts = &committee.policy().http_request_allow_list;
362
363 ensure!(
364 allowed_hosts.contains(host),
365 ExecutionError::UnauthorizedHttpRequest(url)
366 );
367
368 let request = Client::new()
369 .request(request.method.into(), url)
370 .body(request.body)
371 .headers(headers);
372 #[cfg(not(web))]
373 let request = request.timeout(linera_base::time::Duration::from_millis(
374 committee.policy().http_request_timeout_ms,
375 ));
376
377 let response = request.send().await?;
378
379 let mut response_size_limit = committee.policy().maximum_http_response_bytes;
380
381 if http_responses_are_oracle_responses {
382 response_size_limit =
383 response_size_limit.min(committee.policy().maximum_oracle_response_bytes);
384 }
385
386 callback.respond(
387 self.receive_http_response(response, response_size_limit)
388 .await?,
389 );
390 }
391
392 ReadBlobContent { blob_id, callback } => {
393 let content = self.system.read_blob_content(blob_id).await?;
394 if blob_id.blob_type == BlobType::Data {
395 resource_controller
396 .with_state(&mut self.system)
397 .await?
398 .track_blob_read(content.bytes().len() as u64)?;
399 }
400 let is_new = self
401 .system
402 .blob_used(&mut TransactionTracker::default(), blob_id)
403 .await?;
404 callback.respond((content, is_new))
405 }
406
407 AssertBlobExists { blob_id, callback } => {
408 self.system.assert_blob_exists(blob_id).await?;
409 if blob_id.blob_type == BlobType::Data {
411 resource_controller
412 .with_state(&mut self.system)
413 .await?
414 .track_blob_read(0)?;
415 }
416 callback.respond(
417 self.system
418 .blob_used(&mut TransactionTracker::default(), blob_id)
419 .await?,
420 )
421 }
422
423 NextEventIndex {
424 stream_id,
425 callback,
426 } => {
427 let count = self
428 .stream_event_counts
429 .get_mut_or_default(&stream_id)
430 .await?;
431 let index = *count;
432 *count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
433 callback.respond(index)
434 }
435
436 ReadEvent { event_id, callback } => {
437 let event = self.context().extra().get_event(event_id.clone()).await?;
438 let bytes = event.ok_or(ExecutionError::EventsNotFound(vec![event_id]))?;
439 callback.respond(bytes);
440 }
441
442 SubscribeToEvents {
443 chain_id,
444 stream_id,
445 subscriber_app_id,
446 callback,
447 } => {
448 let subscriptions = self
449 .system
450 .event_subscriptions
451 .get_mut_or_default(&(chain_id, stream_id))
452 .await?;
453 let next_index = if subscriptions.applications.insert(subscriber_app_id) {
454 subscriptions.next_index
455 } else {
456 0
457 };
458 callback.respond(next_index);
459 }
460
461 UnsubscribeFromEvents {
462 chain_id,
463 stream_id,
464 subscriber_app_id,
465 callback,
466 } => {
467 let key = (chain_id, stream_id);
468 let subscriptions = self
469 .system
470 .event_subscriptions
471 .get_mut_or_default(&key)
472 .await?;
473 subscriptions.applications.remove(&subscriber_app_id);
474 if subscriptions.applications.is_empty() {
475 self.system.event_subscriptions.remove(&key)?;
476 }
477 callback.respond(());
478 }
479
480 GetApplicationPermissions { callback } => {
481 let app_permissions = self.system.application_permissions.get();
482 callback.respond(app_permissions.clone());
483 }
484 }
485
486 Ok(())
487 }
488}
489
490impl<C> ExecutionStateView<C>
491where
492 C: Context + Clone + Send + Sync + 'static,
493 C::Extra: ExecutionRuntimeContext,
494{
495 async fn receive_http_response(
499 &mut self,
500 response: reqwest::Response,
501 size_limit: u64,
502 ) -> Result<http::Response, ExecutionError> {
503 let status = response.status().as_u16();
504 let maybe_content_length = response.content_length();
505
506 let headers = response
507 .headers()
508 .iter()
509 .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes()))
510 .collect::<Vec<_>>();
511
512 let total_header_size = headers
513 .iter()
514 .map(|header| (header.name.len() + header.value.len()) as u64)
515 .sum();
516
517 let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or(
518 ExecutionError::HttpResponseSizeLimitExceeded {
519 limit: size_limit,
520 size: total_header_size,
521 },
522 )?;
523
524 if let Some(content_length) = maybe_content_length {
525 if content_length > remaining_bytes {
526 return Err(ExecutionError::HttpResponseSizeLimitExceeded {
527 limit: size_limit,
528 size: content_length + total_header_size,
529 });
530 }
531 }
532
533 let mut body = Vec::with_capacity(maybe_content_length.unwrap_or(0) as usize);
534 let mut body_stream = response.bytes_stream();
535
536 while let Some(bytes) = body_stream.next().await.transpose()? {
537 remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or(
538 ExecutionError::HttpResponseSizeLimitExceeded {
539 limit: size_limit,
540 size: bytes.len() as u64 + (size_limit - remaining_bytes),
541 },
542 )?;
543
544 body.extend(&bytes);
545 }
546
547 Ok(http::Response {
548 status,
549 headers,
550 body,
551 })
552 }
553}
554
555#[derive(Debug)]
557pub enum ExecutionRequest {
558 #[cfg(not(web))]
559 LoadContract {
560 id: ApplicationId,
561 #[debug(skip)]
562 callback: Sender<(UserContractCode, ApplicationDescription, TransactionTracker)>,
563 #[debug(skip)]
564 txn_tracker: TransactionTracker,
565 },
566
567 #[cfg(not(web))]
568 LoadService {
569 id: ApplicationId,
570 #[debug(skip)]
571 callback: Sender<(UserServiceCode, ApplicationDescription, TransactionTracker)>,
572 #[debug(skip)]
573 txn_tracker: TransactionTracker,
574 },
575
576 ChainBalance {
577 #[debug(skip)]
578 callback: Sender<Amount>,
579 },
580
581 OwnerBalance {
582 owner: AccountOwner,
583 #[debug(skip)]
584 callback: Sender<Amount>,
585 },
586
587 OwnerBalances {
588 #[debug(skip)]
589 callback: Sender<Vec<(AccountOwner, Amount)>>,
590 },
591
592 BalanceOwners {
593 #[debug(skip)]
594 callback: Sender<Vec<AccountOwner>>,
595 },
596
597 Transfer {
598 source: AccountOwner,
599 destination: Account,
600 amount: Amount,
601 #[debug(skip_if = Option::is_none)]
602 signer: Option<AccountOwner>,
603 application_id: ApplicationId,
604 #[debug(skip)]
605 callback: Sender<Option<OutgoingMessage>>,
606 },
607
608 Claim {
609 source: Account,
610 destination: Account,
611 amount: Amount,
612 #[debug(skip_if = Option::is_none)]
613 signer: Option<AccountOwner>,
614 application_id: ApplicationId,
615 #[debug(skip)]
616 callback: Sender<Option<OutgoingMessage>>,
617 },
618
619 SystemTimestamp {
620 #[debug(skip)]
621 callback: Sender<Timestamp>,
622 },
623
624 ChainOwnership {
625 #[debug(skip)]
626 callback: Sender<ChainOwnership>,
627 },
628
629 ReadValueBytes {
630 id: ApplicationId,
631 #[debug(with = hex_debug)]
632 key: Vec<u8>,
633 #[debug(skip)]
634 callback: Sender<Option<Vec<u8>>>,
635 },
636
637 ContainsKey {
638 id: ApplicationId,
639 key: Vec<u8>,
640 #[debug(skip)]
641 callback: Sender<bool>,
642 },
643
644 ContainsKeys {
645 id: ApplicationId,
646 #[debug(with = hex_vec_debug)]
647 keys: Vec<Vec<u8>>,
648 callback: Sender<Vec<bool>>,
649 },
650
651 ReadMultiValuesBytes {
652 id: ApplicationId,
653 #[debug(with = hex_vec_debug)]
654 keys: Vec<Vec<u8>>,
655 #[debug(skip)]
656 callback: Sender<Vec<Option<Vec<u8>>>>,
657 },
658
659 FindKeysByPrefix {
660 id: ApplicationId,
661 #[debug(with = hex_debug)]
662 key_prefix: Vec<u8>,
663 #[debug(skip)]
664 callback: Sender<Vec<Vec<u8>>>,
665 },
666
667 FindKeyValuesByPrefix {
668 id: ApplicationId,
669 #[debug(with = hex_debug)]
670 key_prefix: Vec<u8>,
671 #[debug(skip)]
672 callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
673 },
674
675 WriteBatch {
676 id: ApplicationId,
677 batch: Batch,
678 #[debug(skip)]
679 callback: Sender<()>,
680 },
681
682 OpenChain {
683 ownership: ChainOwnership,
684 #[debug(skip_if = Amount::is_zero)]
685 balance: Amount,
686 parent_id: ChainId,
687 block_height: BlockHeight,
688 application_permissions: ApplicationPermissions,
689 timestamp: Timestamp,
690 #[debug(skip)]
691 txn_tracker: TransactionTracker,
692 #[debug(skip)]
693 callback: Sender<(ChainId, TransactionTracker)>,
694 },
695
696 CloseChain {
697 application_id: ApplicationId,
698 #[debug(skip)]
699 callback: Sender<Result<(), ExecutionError>>,
700 },
701
702 ChangeApplicationPermissions {
703 application_id: ApplicationId,
704 application_permissions: ApplicationPermissions,
705 #[debug(skip)]
706 callback: Sender<Result<(), ExecutionError>>,
707 },
708
709 CreateApplication {
710 chain_id: ChainId,
711 block_height: BlockHeight,
712 module_id: ModuleId,
713 parameters: Vec<u8>,
714 required_application_ids: Vec<ApplicationId>,
715 #[debug(skip)]
716 txn_tracker: TransactionTracker,
717 #[debug(skip)]
718 callback: Sender<Result<CreateApplicationResult, ExecutionError>>,
719 },
720
721 PerformHttpRequest {
722 request: http::Request,
723 http_responses_are_oracle_responses: bool,
724 #[debug(skip)]
725 callback: Sender<http::Response>,
726 },
727
728 ReadBlobContent {
729 blob_id: BlobId,
730 #[debug(skip)]
731 callback: Sender<(BlobContent, bool)>,
732 },
733
734 AssertBlobExists {
735 blob_id: BlobId,
736 #[debug(skip)]
737 callback: Sender<bool>,
738 },
739
740 NextEventIndex {
741 stream_id: StreamId,
742 #[debug(skip)]
743 callback: Sender<u32>,
744 },
745
746 ReadEvent {
747 event_id: EventId,
748 callback: oneshot::Sender<Vec<u8>>,
749 },
750
751 SubscribeToEvents {
752 chain_id: ChainId,
753 stream_id: StreamId,
754 subscriber_app_id: ApplicationId,
755 #[debug(skip)]
756 callback: Sender<u32>,
757 },
758
759 UnsubscribeFromEvents {
760 chain_id: ChainId,
761 stream_id: StreamId,
762 subscriber_app_id: ApplicationId,
763 #[debug(skip)]
764 callback: Sender<()>,
765 },
766
767 GetApplicationPermissions {
768 #[debug(skip)]
769 callback: Sender<ApplicationPermissions>,
770 },
771}