linera_execution/
execution_state_actor.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Handle requests from the synchronous execution thread of user applications.
5
6#[cfg(with_metrics)]
7use std::sync::LazyLock;
8#[cfg(not(web))]
9use std::time::Duration;
10
11use custom_debug_derive::Debug;
12use futures::{channel::mpsc, StreamExt as _};
13#[cfg(with_metrics)]
14use linera_base::prometheus_util::{
15    exponential_bucket_latencies, register_histogram_vec, MeasureLatency as _,
16};
17use linera_base::{
18    data_types::{
19        Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, Timestamp,
20    },
21    ensure, hex_debug, hex_vec_debug, http,
22    identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, MessageId, StreamId},
23    ownership::ChainOwnership,
24};
25use linera_views::{batch::Batch, context::Context, views::View};
26use oneshot::Sender;
27#[cfg(with_metrics)]
28use prometheus::HistogramVec;
29use reqwest::{header::HeaderMap, Client, Url};
30
31use crate::{
32    system::{CreateApplicationResult, OpenChainConfig, Recipient},
33    util::RespondExt,
34    ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeContext,
35    ExecutionStateView, ModuleId, OutgoingMessage, ResourceController, TransactionTracker,
36    UserContractCode, UserServiceCode,
37};
38
39#[cfg(with_metrics)]
40/// Histogram of the latency to load a contract bytecode.
41static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
42    register_histogram_vec(
43        "load_contract_latency",
44        "Load contract latency",
45        &[],
46        exponential_bucket_latencies(250.0),
47    )
48});
49
50#[cfg(with_metrics)]
51/// Histogram of the latency to load a service bytecode.
52static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
53    register_histogram_vec(
54        "load_service_latency",
55        "Load service latency",
56        &[],
57        exponential_bucket_latencies(250.0),
58    )
59});
60
61pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
62
63impl<C> ExecutionStateView<C>
64where
65    C: Context + Clone + Send + Sync + 'static,
66    C::Extra: ExecutionRuntimeContext,
67{
68    pub(crate) async fn load_contract(
69        &mut self,
70        id: ApplicationId,
71        txn_tracker: &mut TransactionTracker,
72    ) -> Result<(UserContractCode, ApplicationDescription), ExecutionError> {
73        #[cfg(with_metrics)]
74        let _latency = LOAD_CONTRACT_LATENCY.measure_latency();
75        let blob_id = id.description_blob_id();
76        let description = match txn_tracker.created_blobs().get(&blob_id) {
77            Some(description) => {
78                let blob = description.clone();
79                bcs::from_bytes(blob.bytes())?
80            }
81            None => {
82                self.system
83                    .describe_application(id, Some(txn_tracker))
84                    .await?
85            }
86        };
87        let code = self
88            .context()
89            .extra()
90            .get_user_contract(&description)
91            .await?;
92        Ok((code, description))
93    }
94
95    pub(crate) async fn load_service(
96        &mut self,
97        id: ApplicationId,
98        txn_tracker: Option<&mut TransactionTracker>,
99    ) -> Result<(UserServiceCode, ApplicationDescription), ExecutionError> {
100        #[cfg(with_metrics)]
101        let _latency = LOAD_SERVICE_LATENCY.measure_latency();
102        let blob_id = id.description_blob_id();
103        let description = match txn_tracker
104            .as_ref()
105            .and_then(|tracker| tracker.created_blobs().get(&blob_id))
106        {
107            Some(description) => {
108                let blob = description.clone();
109                bcs::from_bytes(blob.bytes())?
110            }
111            None => self.system.describe_application(id, txn_tracker).await?,
112        };
113        let code = self
114            .context()
115            .extra()
116            .get_user_service(&description)
117            .await?;
118        Ok((code, description))
119    }
120
121    // TODO(#1416): Support concurrent I/O.
122    pub(crate) async fn handle_request(
123        &mut self,
124        request: ExecutionRequest,
125        resource_controller: &mut ResourceController<Option<AccountOwner>>,
126    ) -> Result<(), ExecutionError> {
127        use ExecutionRequest::*;
128        match request {
129            #[cfg(not(web))]
130            LoadContract {
131                id,
132                callback,
133                mut txn_tracker,
134            } => {
135                let (code, description) = self.load_contract(id, &mut txn_tracker).await?;
136                callback.respond((code, description, txn_tracker))
137            }
138            #[cfg(not(web))]
139            LoadService {
140                id,
141                callback,
142                mut txn_tracker,
143            } => {
144                let (code, description) = self.load_service(id, Some(&mut txn_tracker)).await?;
145                callback.respond((code, description, txn_tracker))
146            }
147
148            ChainBalance { callback } => {
149                let balance = *self.system.balance.get();
150                callback.respond(balance);
151            }
152
153            OwnerBalance { owner, callback } => {
154                let balance = self.system.balances.get(&owner).await?.unwrap_or_default();
155                callback.respond(balance);
156            }
157
158            OwnerBalances { callback } => {
159                let balances = self.system.balances.index_values().await?;
160                callback.respond(balances.into_iter().collect());
161            }
162
163            BalanceOwners { callback } => {
164                let owners = self.system.balances.indices().await?;
165                callback.respond(owners);
166            }
167
168            Transfer {
169                source,
170                destination,
171                amount,
172                signer,
173                application_id,
174                callback,
175            } => callback.respond(
176                self.system
177                    .transfer(
178                        signer,
179                        Some(application_id),
180                        source,
181                        Recipient::Account(destination),
182                        amount,
183                    )
184                    .await?,
185            ),
186
187            Claim {
188                source,
189                destination,
190                amount,
191                signer,
192                application_id,
193                callback,
194            } => callback.respond(
195                self.system
196                    .claim(
197                        signer,
198                        Some(application_id),
199                        source.owner,
200                        source.chain_id,
201                        Recipient::Account(destination),
202                        amount,
203                    )
204                    .await?,
205            ),
206
207            SystemTimestamp { callback } => {
208                let timestamp = *self.system.timestamp.get();
209                callback.respond(timestamp);
210            }
211
212            ChainOwnership { callback } => {
213                let ownership = self.system.ownership.get().clone();
214                callback.respond(ownership);
215            }
216
217            ContainsKey { id, key, callback } => {
218                let view = self.users.try_load_entry(&id).await?;
219                let result = match view {
220                    Some(view) => view.contains_key(&key).await?,
221                    None => false,
222                };
223                callback.respond(result);
224            }
225
226            ContainsKeys { id, keys, callback } => {
227                let view = self.users.try_load_entry(&id).await?;
228                let result = match view {
229                    Some(view) => view.contains_keys(keys).await?,
230                    None => vec![false; keys.len()],
231                };
232                callback.respond(result);
233            }
234
235            ReadMultiValuesBytes { id, keys, callback } => {
236                let view = self.users.try_load_entry(&id).await?;
237                let values = match view {
238                    Some(view) => view.multi_get(keys).await?,
239                    None => vec![None; keys.len()],
240                };
241                callback.respond(values);
242            }
243
244            ReadValueBytes { id, key, callback } => {
245                let view = self.users.try_load_entry(&id).await?;
246                let result = match view {
247                    Some(view) => view.get(&key).await?,
248                    None => None,
249                };
250                callback.respond(result);
251            }
252
253            FindKeysByPrefix {
254                id,
255                key_prefix,
256                callback,
257            } => {
258                let view = self.users.try_load_entry(&id).await?;
259                let result = match view {
260                    Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
261                    None => Vec::new(),
262                };
263                callback.respond(result);
264            }
265
266            FindKeyValuesByPrefix {
267                id,
268                key_prefix,
269                callback,
270            } => {
271                let view = self.users.try_load_entry(&id).await?;
272                let result = match view {
273                    Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
274                    None => Vec::new(),
275                };
276                callback.respond(result);
277            }
278
279            WriteBatch {
280                id,
281                batch,
282                callback,
283            } => {
284                let mut view = self.users.try_load_entry_mut(&id).await?;
285                view.write_batch(batch).await?;
286                callback.respond(());
287            }
288
289            OpenChain {
290                ownership,
291                balance,
292                next_message_id,
293                application_permissions,
294                callback,
295            } => {
296                let inactive_err = || ExecutionError::InactiveChain;
297                let config = OpenChainConfig {
298                    ownership,
299                    admin_id: self.system.admin_id.get().ok_or_else(inactive_err)?,
300                    epoch: self.system.epoch.get().ok_or_else(inactive_err)?,
301                    committees: self.system.committees.get().clone(),
302                    balance,
303                    application_permissions,
304                };
305                callback.respond(self.system.open_chain(config, next_message_id).await?);
306            }
307
308            CloseChain {
309                application_id,
310                callback,
311            } => {
312                let app_permissions = self.system.application_permissions.get();
313                if !app_permissions.can_close_chain(&application_id) {
314                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
315                } else {
316                    self.system.close_chain().await?;
317                    callback.respond(Ok(()));
318                }
319            }
320
321            ChangeApplicationPermissions {
322                application_id,
323                application_permissions,
324                callback,
325            } => {
326                let app_permissions = self.system.application_permissions.get();
327                if !app_permissions.can_change_application_permissions(&application_id) {
328                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
329                } else {
330                    self.system
331                        .application_permissions
332                        .set(application_permissions);
333                    callback.respond(Ok(()));
334                }
335            }
336
337            CreateApplication {
338                chain_id,
339                block_height,
340                module_id,
341                parameters,
342                required_application_ids,
343                callback,
344                txn_tracker,
345            } => {
346                let create_application_result = self
347                    .system
348                    .create_application(
349                        chain_id,
350                        block_height,
351                        module_id,
352                        parameters,
353                        required_application_ids,
354                        txn_tracker,
355                    )
356                    .await?;
357                callback.respond(Ok(create_application_result));
358            }
359
360            PerformHttpRequest {
361                request,
362                http_responses_are_oracle_responses,
363                callback,
364            } => {
365                let headers = request
366                    .headers
367                    .into_iter()
368                    .map(|http::Header { name, value }| Ok((name.parse()?, value.try_into()?)))
369                    .collect::<Result<HeaderMap, ExecutionError>>()?;
370
371                let url = Url::parse(&request.url)?;
372                let host = url
373                    .host_str()
374                    .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
375
376                let (_epoch, committee) = self
377                    .system
378                    .current_committee()
379                    .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
380                let allowed_hosts = &committee.policy().http_request_allow_list;
381
382                ensure!(
383                    allowed_hosts.contains(host),
384                    ExecutionError::UnauthorizedHttpRequest(url)
385                );
386
387                #[cfg_attr(web, allow(unused_mut))]
388                let mut request = Client::new()
389                    .request(request.method.into(), url)
390                    .body(request.body)
391                    .headers(headers);
392                #[cfg(not(web))]
393                {
394                    request = request.timeout(Duration::from_millis(
395                        committee.policy().http_request_timeout_ms,
396                    ));
397                }
398
399                let response = request.send().await?;
400
401                let mut response_size_limit = committee.policy().maximum_http_response_bytes;
402
403                if http_responses_are_oracle_responses {
404                    response_size_limit =
405                        response_size_limit.min(committee.policy().maximum_oracle_response_bytes);
406                }
407
408                callback.respond(
409                    self.receive_http_response(response, response_size_limit)
410                        .await?,
411                );
412            }
413
414            ReadBlobContent { blob_id, callback } => {
415                let blob = self.system.read_blob_content(blob_id).await?;
416                if blob_id.blob_type == BlobType::Data {
417                    resource_controller
418                        .with_state(&mut self.system)
419                        .await?
420                        .track_blob_read(blob.bytes().len() as u64)?;
421                }
422                let is_new = self.system.blob_used(None, blob_id).await?;
423                callback.respond((blob, is_new))
424            }
425
426            AssertBlobExists { blob_id, callback } => {
427                self.system.assert_blob_exists(blob_id).await?;
428                // Treating this as reading a size-0 blob for fee purposes.
429                if blob_id.blob_type == BlobType::Data {
430                    resource_controller
431                        .with_state(&mut self.system)
432                        .await?
433                        .track_blob_read(0)?;
434                }
435                callback.respond(self.system.blob_used(None, blob_id).await?)
436            }
437
438            NextEventIndex {
439                stream_id,
440                callback,
441            } => {
442                let count = self
443                    .stream_event_counts
444                    .get_mut_or_default(&stream_id)
445                    .await?;
446                let index = *count;
447                *count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
448                callback.respond(index)
449            }
450
451            ReadEvent { event_id, callback } => {
452                let event_value = self.context().extra().get_event(event_id).await?;
453                callback.respond(event_value);
454            }
455
456            SubscribeToEvents {
457                chain_id,
458                stream_id,
459                subscriber_app_id,
460                callback,
461            } => {
462                let subscriptions = self
463                    .system
464                    .event_subscriptions
465                    .get_mut_or_default(&(chain_id, stream_id))
466                    .await?;
467                subscriptions.applications.insert(subscriber_app_id);
468                callback.respond(());
469            }
470
471            UnsubscribeFromEvents {
472                chain_id,
473                stream_id,
474                subscriber_app_id,
475                callback,
476            } => {
477                let key = (chain_id, stream_id);
478                let subscriptions = self
479                    .system
480                    .event_subscriptions
481                    .get_mut_or_default(&key)
482                    .await?;
483                subscriptions.applications.remove(&subscriber_app_id);
484                if subscriptions.applications.is_empty() {
485                    self.system.event_subscriptions.remove(&key)?;
486                }
487                callback.respond(());
488            }
489
490            GetApplicationPermissions { callback } => {
491                let app_permissions = self.system.application_permissions.get();
492                callback.respond(app_permissions.clone());
493            }
494        }
495
496        Ok(())
497    }
498}
499
500impl<C> ExecutionStateView<C>
501where
502    C: Context + Clone + Send + Sync + 'static,
503    C::Extra: ExecutionRuntimeContext,
504{
505    /// Receives an HTTP response, returning the prepared [`http::Response`] instance.
506    ///
507    /// Ensures that the response does not exceed the provided `size_limit`.
508    async fn receive_http_response(
509        &mut self,
510        response: reqwest::Response,
511        size_limit: u64,
512    ) -> Result<http::Response, ExecutionError> {
513        let status = response.status().as_u16();
514        let maybe_content_length = response.content_length();
515
516        let headers = response
517            .headers()
518            .iter()
519            .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes()))
520            .collect::<Vec<_>>();
521
522        let total_header_size = headers
523            .iter()
524            .map(|header| (header.name.len() + header.value.len()) as u64)
525            .sum();
526
527        let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or(
528            ExecutionError::HttpResponseSizeLimitExceeded {
529                limit: size_limit,
530                size: total_header_size,
531            },
532        )?;
533
534        if let Some(content_length) = maybe_content_length {
535            if content_length > remaining_bytes {
536                return Err(ExecutionError::HttpResponseSizeLimitExceeded {
537                    limit: size_limit,
538                    size: content_length + total_header_size,
539                });
540            }
541        }
542
543        let mut body = Vec::with_capacity(maybe_content_length.unwrap_or(0) as usize);
544        let mut body_stream = response.bytes_stream();
545
546        while let Some(bytes) = body_stream.next().await.transpose()? {
547            remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or(
548                ExecutionError::HttpResponseSizeLimitExceeded {
549                    limit: size_limit,
550                    size: bytes.len() as u64 + (size_limit - remaining_bytes),
551                },
552            )?;
553
554            body.extend(&bytes);
555        }
556
557        Ok(http::Response {
558            status,
559            headers,
560            body,
561        })
562    }
563}
564
565/// Requests to the execution state.
566#[derive(Debug)]
567pub enum ExecutionRequest {
568    #[cfg(not(web))]
569    LoadContract {
570        id: ApplicationId,
571        #[debug(skip)]
572        callback: Sender<(UserContractCode, ApplicationDescription, TransactionTracker)>,
573        #[debug(skip)]
574        txn_tracker: TransactionTracker,
575    },
576
577    #[cfg(not(web))]
578    LoadService {
579        id: ApplicationId,
580        #[debug(skip)]
581        callback: Sender<(UserServiceCode, ApplicationDescription, TransactionTracker)>,
582        #[debug(skip)]
583        txn_tracker: TransactionTracker,
584    },
585
586    ChainBalance {
587        #[debug(skip)]
588        callback: Sender<Amount>,
589    },
590
591    OwnerBalance {
592        owner: AccountOwner,
593        #[debug(skip)]
594        callback: Sender<Amount>,
595    },
596
597    OwnerBalances {
598        #[debug(skip)]
599        callback: Sender<Vec<(AccountOwner, Amount)>>,
600    },
601
602    BalanceOwners {
603        #[debug(skip)]
604        callback: Sender<Vec<AccountOwner>>,
605    },
606
607    Transfer {
608        source: AccountOwner,
609        destination: Account,
610        amount: Amount,
611        #[debug(skip_if = Option::is_none)]
612        signer: Option<AccountOwner>,
613        application_id: ApplicationId,
614        #[debug(skip)]
615        callback: Sender<Option<OutgoingMessage>>,
616    },
617
618    Claim {
619        source: Account,
620        destination: Account,
621        amount: Amount,
622        #[debug(skip_if = Option::is_none)]
623        signer: Option<AccountOwner>,
624        application_id: ApplicationId,
625        #[debug(skip)]
626        callback: Sender<OutgoingMessage>,
627    },
628
629    SystemTimestamp {
630        #[debug(skip)]
631        callback: Sender<Timestamp>,
632    },
633
634    ChainOwnership {
635        #[debug(skip)]
636        callback: Sender<ChainOwnership>,
637    },
638
639    ReadValueBytes {
640        id: ApplicationId,
641        #[debug(with = hex_debug)]
642        key: Vec<u8>,
643        #[debug(skip)]
644        callback: Sender<Option<Vec<u8>>>,
645    },
646
647    ContainsKey {
648        id: ApplicationId,
649        key: Vec<u8>,
650        #[debug(skip)]
651        callback: Sender<bool>,
652    },
653
654    ContainsKeys {
655        id: ApplicationId,
656        #[debug(with = hex_vec_debug)]
657        keys: Vec<Vec<u8>>,
658        callback: Sender<Vec<bool>>,
659    },
660
661    ReadMultiValuesBytes {
662        id: ApplicationId,
663        #[debug(with = hex_vec_debug)]
664        keys: Vec<Vec<u8>>,
665        #[debug(skip)]
666        callback: Sender<Vec<Option<Vec<u8>>>>,
667    },
668
669    FindKeysByPrefix {
670        id: ApplicationId,
671        #[debug(with = hex_debug)]
672        key_prefix: Vec<u8>,
673        #[debug(skip)]
674        callback: Sender<Vec<Vec<u8>>>,
675    },
676
677    FindKeyValuesByPrefix {
678        id: ApplicationId,
679        #[debug(with = hex_debug)]
680        key_prefix: Vec<u8>,
681        #[debug(skip)]
682        callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
683    },
684
685    WriteBatch {
686        id: ApplicationId,
687        batch: Batch,
688        #[debug(skip)]
689        callback: Sender<()>,
690    },
691
692    OpenChain {
693        ownership: ChainOwnership,
694        #[debug(skip_if = Amount::is_zero)]
695        balance: Amount,
696        next_message_id: MessageId,
697        application_permissions: ApplicationPermissions,
698        #[debug(skip)]
699        callback: Sender<OutgoingMessage>,
700    },
701
702    CloseChain {
703        application_id: ApplicationId,
704        #[debug(skip)]
705        callback: Sender<Result<(), ExecutionError>>,
706    },
707
708    ChangeApplicationPermissions {
709        application_id: ApplicationId,
710        application_permissions: ApplicationPermissions,
711        #[debug(skip)]
712        callback: Sender<Result<(), ExecutionError>>,
713    },
714
715    CreateApplication {
716        chain_id: ChainId,
717        block_height: BlockHeight,
718        module_id: ModuleId,
719        parameters: Vec<u8>,
720        required_application_ids: Vec<ApplicationId>,
721        #[debug(skip)]
722        txn_tracker: TransactionTracker,
723        #[debug(skip)]
724        callback: Sender<Result<CreateApplicationResult, ExecutionError>>,
725    },
726
727    PerformHttpRequest {
728        request: http::Request,
729        http_responses_are_oracle_responses: bool,
730        #[debug(skip)]
731        callback: Sender<http::Response>,
732    },
733
734    ReadBlobContent {
735        blob_id: BlobId,
736        #[debug(skip)]
737        callback: Sender<(BlobContent, bool)>,
738    },
739
740    AssertBlobExists {
741        blob_id: BlobId,
742        #[debug(skip)]
743        callback: Sender<bool>,
744    },
745
746    NextEventIndex {
747        stream_id: StreamId,
748        #[debug(skip)]
749        callback: Sender<u32>,
750    },
751
752    ReadEvent {
753        event_id: EventId,
754        callback: oneshot::Sender<Vec<u8>>,
755    },
756
757    SubscribeToEvents {
758        chain_id: ChainId,
759        stream_id: StreamId,
760        subscriber_app_id: ApplicationId,
761        #[debug(skip)]
762        callback: Sender<()>,
763    },
764
765    UnsubscribeFromEvents {
766        chain_id: ChainId,
767        stream_id: StreamId,
768        subscriber_app_id: ApplicationId,
769        #[debug(skip)]
770        callback: Sender<()>,
771    },
772
773    GetApplicationPermissions {
774        #[debug(skip)]
775        callback: Sender<ApplicationPermissions>,
776    },
777}