linera_execution/
execution_state_actor.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Handle requests from the synchronous execution thread of user applications.
5
6use custom_debug_derive::Debug;
7use futures::{channel::mpsc, StreamExt as _};
8#[cfg(with_metrics)]
9use linera_base::prometheus_util::MeasureLatency as _;
10use linera_base::{
11    data_types::{
12        Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, Timestamp,
13    },
14    ensure, hex_debug, hex_vec_debug, http,
15    identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, StreamId},
16    ownership::ChainOwnership,
17};
18use linera_views::{batch::Batch, context::Context, views::View};
19use oneshot::Sender;
20use reqwest::{header::HeaderMap, Client, Url};
21
22use crate::{
23    system::{CreateApplicationResult, OpenChainConfig},
24    util::RespondExt,
25    ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeContext,
26    ExecutionStateView, ModuleId, OutgoingMessage, ResourceController, TransactionTracker,
27    UserContractCode, UserServiceCode,
28};
29
30#[cfg(with_metrics)]
31mod metrics {
32    use std::sync::LazyLock;
33
34    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
35    use prometheus::HistogramVec;
36
37    /// Histogram of the latency to load a contract bytecode.
38    pub static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
39        register_histogram_vec(
40            "load_contract_latency",
41            "Load contract latency",
42            &[],
43            exponential_bucket_latencies(250.0),
44        )
45    });
46
47    /// Histogram of the latency to load a service bytecode.
48    pub static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
49        register_histogram_vec(
50            "load_service_latency",
51            "Load service latency",
52            &[],
53            exponential_bucket_latencies(250.0),
54        )
55    });
56}
57
58pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
59
60impl<C> ExecutionStateView<C>
61where
62    C: Context + Clone + Send + Sync + 'static,
63    C::Extra: ExecutionRuntimeContext,
64{
65    pub(crate) async fn load_contract(
66        &mut self,
67        id: ApplicationId,
68        txn_tracker: &mut TransactionTracker,
69    ) -> Result<(UserContractCode, ApplicationDescription), ExecutionError> {
70        #[cfg(with_metrics)]
71        let _latency = metrics::LOAD_CONTRACT_LATENCY.measure_latency();
72        let blob_id = id.description_blob_id();
73        let description = match txn_tracker.get_blob_content(&blob_id) {
74            Some(blob) => bcs::from_bytes(blob.bytes())?,
75            None => self.system.describe_application(id, txn_tracker).await?,
76        };
77        let code = self
78            .context()
79            .extra()
80            .get_user_contract(&description, txn_tracker)
81            .await?;
82        Ok((code, description))
83    }
84
85    pub(crate) async fn load_service(
86        &mut self,
87        id: ApplicationId,
88        txn_tracker: &mut TransactionTracker,
89    ) -> Result<(UserServiceCode, ApplicationDescription), ExecutionError> {
90        #[cfg(with_metrics)]
91        let _latency = metrics::LOAD_SERVICE_LATENCY.measure_latency();
92        let blob_id = id.description_blob_id();
93        let description = match txn_tracker.get_blob_content(&blob_id) {
94            Some(blob) => bcs::from_bytes(blob.bytes())?,
95            None => self.system.describe_application(id, txn_tracker).await?,
96        };
97        let code = self
98            .context()
99            .extra()
100            .get_user_service(&description, txn_tracker)
101            .await?;
102        Ok((code, description))
103    }
104
105    // TODO(#1416): Support concurrent I/O.
106    pub(crate) async fn handle_request(
107        &mut self,
108        request: ExecutionRequest,
109        resource_controller: &mut ResourceController<Option<AccountOwner>>,
110    ) -> Result<(), ExecutionError> {
111        use ExecutionRequest::*;
112        match request {
113            #[cfg(not(web))]
114            LoadContract {
115                id,
116                callback,
117                mut txn_tracker,
118            } => {
119                let (code, description) = self.load_contract(id, &mut txn_tracker).await?;
120                callback.respond((code, description, txn_tracker))
121            }
122            #[cfg(not(web))]
123            LoadService {
124                id,
125                callback,
126                mut txn_tracker,
127            } => {
128                let (code, description) = self.load_service(id, &mut txn_tracker).await?;
129                callback.respond((code, description, txn_tracker))
130            }
131
132            ChainBalance { callback } => {
133                let balance = *self.system.balance.get();
134                callback.respond(balance);
135            }
136
137            OwnerBalance { owner, callback } => {
138                let balance = self.system.balances.get(&owner).await?.unwrap_or_default();
139                callback.respond(balance);
140            }
141
142            OwnerBalances { callback } => {
143                let balances = self.system.balances.index_values().await?;
144                callback.respond(balances.into_iter().collect());
145            }
146
147            BalanceOwners { callback } => {
148                let owners = self.system.balances.indices().await?;
149                callback.respond(owners);
150            }
151
152            Transfer {
153                source,
154                destination,
155                amount,
156                signer,
157                application_id,
158                callback,
159            } => callback.respond(
160                self.system
161                    .transfer(signer, Some(application_id), source, destination, amount)
162                    .await?,
163            ),
164
165            Claim {
166                source,
167                destination,
168                amount,
169                signer,
170                application_id,
171                callback,
172            } => callback.respond(
173                self.system
174                    .claim(
175                        signer,
176                        Some(application_id),
177                        source.owner,
178                        source.chain_id,
179                        destination,
180                        amount,
181                    )
182                    .await?,
183            ),
184
185            SystemTimestamp { callback } => {
186                let timestamp = *self.system.timestamp.get();
187                callback.respond(timestamp);
188            }
189
190            ChainOwnership { callback } => {
191                let ownership = self.system.ownership.get().clone();
192                callback.respond(ownership);
193            }
194
195            ContainsKey { id, key, callback } => {
196                let view = self.users.try_load_entry(&id).await?;
197                let result = match view {
198                    Some(view) => view.contains_key(&key).await?,
199                    None => false,
200                };
201                callback.respond(result);
202            }
203
204            ContainsKeys { id, keys, callback } => {
205                let view = self.users.try_load_entry(&id).await?;
206                let result = match view {
207                    Some(view) => view.contains_keys(keys).await?,
208                    None => vec![false; keys.len()],
209                };
210                callback.respond(result);
211            }
212
213            ReadMultiValuesBytes { id, keys, callback } => {
214                let view = self.users.try_load_entry(&id).await?;
215                let values = match view {
216                    Some(view) => view.multi_get(keys).await?,
217                    None => vec![None; keys.len()],
218                };
219                callback.respond(values);
220            }
221
222            ReadValueBytes { id, key, callback } => {
223                let view = self.users.try_load_entry(&id).await?;
224                let result = match view {
225                    Some(view) => view.get(&key).await?,
226                    None => None,
227                };
228                callback.respond(result);
229            }
230
231            FindKeysByPrefix {
232                id,
233                key_prefix,
234                callback,
235            } => {
236                let view = self.users.try_load_entry(&id).await?;
237                let result = match view {
238                    Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
239                    None => Vec::new(),
240                };
241                callback.respond(result);
242            }
243
244            FindKeyValuesByPrefix {
245                id,
246                key_prefix,
247                callback,
248            } => {
249                let view = self.users.try_load_entry(&id).await?;
250                let result = match view {
251                    Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
252                    None => Vec::new(),
253                };
254                callback.respond(result);
255            }
256
257            WriteBatch {
258                id,
259                batch,
260                callback,
261            } => {
262                let mut view = self.users.try_load_entry_mut(&id).await?;
263                view.write_batch(batch).await?;
264                callback.respond(());
265            }
266
267            OpenChain {
268                ownership,
269                balance,
270                parent_id,
271                block_height,
272                application_permissions,
273                timestamp,
274                callback,
275                mut txn_tracker,
276            } => {
277                let config = OpenChainConfig {
278                    ownership,
279                    balance,
280                    application_permissions,
281                };
282                let chain_id = self
283                    .system
284                    .open_chain(config, parent_id, block_height, timestamp, &mut txn_tracker)
285                    .await?;
286                callback.respond((chain_id, txn_tracker));
287            }
288
289            CloseChain {
290                application_id,
291                callback,
292            } => {
293                let app_permissions = self.system.application_permissions.get();
294                if !app_permissions.can_close_chain(&application_id) {
295                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
296                } else {
297                    self.system.close_chain().await?;
298                    callback.respond(Ok(()));
299                }
300            }
301
302            ChangeApplicationPermissions {
303                application_id,
304                application_permissions,
305                callback,
306            } => {
307                let app_permissions = self.system.application_permissions.get();
308                if !app_permissions.can_change_application_permissions(&application_id) {
309                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
310                } else {
311                    self.system
312                        .application_permissions
313                        .set(application_permissions);
314                    callback.respond(Ok(()));
315                }
316            }
317
318            CreateApplication {
319                chain_id,
320                block_height,
321                module_id,
322                parameters,
323                required_application_ids,
324                callback,
325                txn_tracker,
326            } => {
327                let create_application_result = self
328                    .system
329                    .create_application(
330                        chain_id,
331                        block_height,
332                        module_id,
333                        parameters,
334                        required_application_ids,
335                        txn_tracker,
336                    )
337                    .await?;
338                callback.respond(Ok(create_application_result));
339            }
340
341            PerformHttpRequest {
342                request,
343                http_responses_are_oracle_responses,
344                callback,
345            } => {
346                let headers = request
347                    .headers
348                    .into_iter()
349                    .map(|http::Header { name, value }| Ok((name.parse()?, value.try_into()?)))
350                    .collect::<Result<HeaderMap, ExecutionError>>()?;
351
352                let url = Url::parse(&request.url)?;
353                let host = url
354                    .host_str()
355                    .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
356
357                let (_epoch, committee) = self
358                    .system
359                    .current_committee()
360                    .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
361                let allowed_hosts = &committee.policy().http_request_allow_list;
362
363                ensure!(
364                    allowed_hosts.contains(host),
365                    ExecutionError::UnauthorizedHttpRequest(url)
366                );
367
368                let request = Client::new()
369                    .request(request.method.into(), url)
370                    .body(request.body)
371                    .headers(headers);
372                #[cfg(not(web))]
373                let request = request.timeout(linera_base::time::Duration::from_millis(
374                    committee.policy().http_request_timeout_ms,
375                ));
376
377                let response = request.send().await?;
378
379                let mut response_size_limit = committee.policy().maximum_http_response_bytes;
380
381                if http_responses_are_oracle_responses {
382                    response_size_limit =
383                        response_size_limit.min(committee.policy().maximum_oracle_response_bytes);
384                }
385
386                callback.respond(
387                    self.receive_http_response(response, response_size_limit)
388                        .await?,
389                );
390            }
391
392            ReadBlobContent { blob_id, callback } => {
393                let content = self.system.read_blob_content(blob_id).await?;
394                if blob_id.blob_type == BlobType::Data {
395                    resource_controller
396                        .with_state(&mut self.system)
397                        .await?
398                        .track_blob_read(content.bytes().len() as u64)?;
399                }
400                let is_new = self
401                    .system
402                    .blob_used(&mut TransactionTracker::default(), blob_id)
403                    .await?;
404                callback.respond((content, is_new))
405            }
406
407            AssertBlobExists { blob_id, callback } => {
408                self.system.assert_blob_exists(blob_id).await?;
409                // Treating this as reading a size-0 blob for fee purposes.
410                if blob_id.blob_type == BlobType::Data {
411                    resource_controller
412                        .with_state(&mut self.system)
413                        .await?
414                        .track_blob_read(0)?;
415                }
416                callback.respond(
417                    self.system
418                        .blob_used(&mut TransactionTracker::default(), blob_id)
419                        .await?,
420                )
421            }
422
423            NextEventIndex {
424                stream_id,
425                callback,
426            } => {
427                let count = self
428                    .stream_event_counts
429                    .get_mut_or_default(&stream_id)
430                    .await?;
431                let index = *count;
432                *count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
433                callback.respond(index)
434            }
435
436            ReadEvent { event_id, callback } => {
437                let event = self.context().extra().get_event(event_id.clone()).await?;
438                let bytes = event.ok_or(ExecutionError::EventsNotFound(vec![event_id]))?;
439                callback.respond(bytes);
440            }
441
442            SubscribeToEvents {
443                chain_id,
444                stream_id,
445                subscriber_app_id,
446                callback,
447            } => {
448                let subscriptions = self
449                    .system
450                    .event_subscriptions
451                    .get_mut_or_default(&(chain_id, stream_id))
452                    .await?;
453                let next_index = if subscriptions.applications.insert(subscriber_app_id) {
454                    subscriptions.next_index
455                } else {
456                    0
457                };
458                callback.respond(next_index);
459            }
460
461            UnsubscribeFromEvents {
462                chain_id,
463                stream_id,
464                subscriber_app_id,
465                callback,
466            } => {
467                let key = (chain_id, stream_id);
468                let subscriptions = self
469                    .system
470                    .event_subscriptions
471                    .get_mut_or_default(&key)
472                    .await?;
473                subscriptions.applications.remove(&subscriber_app_id);
474                if subscriptions.applications.is_empty() {
475                    self.system.event_subscriptions.remove(&key)?;
476                }
477                callback.respond(());
478            }
479
480            GetApplicationPermissions { callback } => {
481                let app_permissions = self.system.application_permissions.get();
482                callback.respond(app_permissions.clone());
483            }
484        }
485
486        Ok(())
487    }
488}
489
490impl<C> ExecutionStateView<C>
491where
492    C: Context + Clone + Send + Sync + 'static,
493    C::Extra: ExecutionRuntimeContext,
494{
495    /// Receives an HTTP response, returning the prepared [`http::Response`] instance.
496    ///
497    /// Ensures that the response does not exceed the provided `size_limit`.
498    async fn receive_http_response(
499        &mut self,
500        response: reqwest::Response,
501        size_limit: u64,
502    ) -> Result<http::Response, ExecutionError> {
503        let status = response.status().as_u16();
504        let maybe_content_length = response.content_length();
505
506        let headers = response
507            .headers()
508            .iter()
509            .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes()))
510            .collect::<Vec<_>>();
511
512        let total_header_size = headers
513            .iter()
514            .map(|header| (header.name.len() + header.value.len()) as u64)
515            .sum();
516
517        let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or(
518            ExecutionError::HttpResponseSizeLimitExceeded {
519                limit: size_limit,
520                size: total_header_size,
521            },
522        )?;
523
524        if let Some(content_length) = maybe_content_length {
525            if content_length > remaining_bytes {
526                return Err(ExecutionError::HttpResponseSizeLimitExceeded {
527                    limit: size_limit,
528                    size: content_length + total_header_size,
529                });
530            }
531        }
532
533        let mut body = Vec::with_capacity(maybe_content_length.unwrap_or(0) as usize);
534        let mut body_stream = response.bytes_stream();
535
536        while let Some(bytes) = body_stream.next().await.transpose()? {
537            remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or(
538                ExecutionError::HttpResponseSizeLimitExceeded {
539                    limit: size_limit,
540                    size: bytes.len() as u64 + (size_limit - remaining_bytes),
541                },
542            )?;
543
544            body.extend(&bytes);
545        }
546
547        Ok(http::Response {
548            status,
549            headers,
550            body,
551        })
552    }
553}
554
555/// Requests to the execution state.
556#[derive(Debug)]
557pub enum ExecutionRequest {
558    #[cfg(not(web))]
559    LoadContract {
560        id: ApplicationId,
561        #[debug(skip)]
562        callback: Sender<(UserContractCode, ApplicationDescription, TransactionTracker)>,
563        #[debug(skip)]
564        txn_tracker: TransactionTracker,
565    },
566
567    #[cfg(not(web))]
568    LoadService {
569        id: ApplicationId,
570        #[debug(skip)]
571        callback: Sender<(UserServiceCode, ApplicationDescription, TransactionTracker)>,
572        #[debug(skip)]
573        txn_tracker: TransactionTracker,
574    },
575
576    ChainBalance {
577        #[debug(skip)]
578        callback: Sender<Amount>,
579    },
580
581    OwnerBalance {
582        owner: AccountOwner,
583        #[debug(skip)]
584        callback: Sender<Amount>,
585    },
586
587    OwnerBalances {
588        #[debug(skip)]
589        callback: Sender<Vec<(AccountOwner, Amount)>>,
590    },
591
592    BalanceOwners {
593        #[debug(skip)]
594        callback: Sender<Vec<AccountOwner>>,
595    },
596
597    Transfer {
598        source: AccountOwner,
599        destination: Account,
600        amount: Amount,
601        #[debug(skip_if = Option::is_none)]
602        signer: Option<AccountOwner>,
603        application_id: ApplicationId,
604        #[debug(skip)]
605        callback: Sender<Option<OutgoingMessage>>,
606    },
607
608    Claim {
609        source: Account,
610        destination: Account,
611        amount: Amount,
612        #[debug(skip_if = Option::is_none)]
613        signer: Option<AccountOwner>,
614        application_id: ApplicationId,
615        #[debug(skip)]
616        callback: Sender<Option<OutgoingMessage>>,
617    },
618
619    SystemTimestamp {
620        #[debug(skip)]
621        callback: Sender<Timestamp>,
622    },
623
624    ChainOwnership {
625        #[debug(skip)]
626        callback: Sender<ChainOwnership>,
627    },
628
629    ReadValueBytes {
630        id: ApplicationId,
631        #[debug(with = hex_debug)]
632        key: Vec<u8>,
633        #[debug(skip)]
634        callback: Sender<Option<Vec<u8>>>,
635    },
636
637    ContainsKey {
638        id: ApplicationId,
639        key: Vec<u8>,
640        #[debug(skip)]
641        callback: Sender<bool>,
642    },
643
644    ContainsKeys {
645        id: ApplicationId,
646        #[debug(with = hex_vec_debug)]
647        keys: Vec<Vec<u8>>,
648        callback: Sender<Vec<bool>>,
649    },
650
651    ReadMultiValuesBytes {
652        id: ApplicationId,
653        #[debug(with = hex_vec_debug)]
654        keys: Vec<Vec<u8>>,
655        #[debug(skip)]
656        callback: Sender<Vec<Option<Vec<u8>>>>,
657    },
658
659    FindKeysByPrefix {
660        id: ApplicationId,
661        #[debug(with = hex_debug)]
662        key_prefix: Vec<u8>,
663        #[debug(skip)]
664        callback: Sender<Vec<Vec<u8>>>,
665    },
666
667    FindKeyValuesByPrefix {
668        id: ApplicationId,
669        #[debug(with = hex_debug)]
670        key_prefix: Vec<u8>,
671        #[debug(skip)]
672        callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
673    },
674
675    WriteBatch {
676        id: ApplicationId,
677        batch: Batch,
678        #[debug(skip)]
679        callback: Sender<()>,
680    },
681
682    OpenChain {
683        ownership: ChainOwnership,
684        #[debug(skip_if = Amount::is_zero)]
685        balance: Amount,
686        parent_id: ChainId,
687        block_height: BlockHeight,
688        application_permissions: ApplicationPermissions,
689        timestamp: Timestamp,
690        #[debug(skip)]
691        txn_tracker: TransactionTracker,
692        #[debug(skip)]
693        callback: Sender<(ChainId, TransactionTracker)>,
694    },
695
696    CloseChain {
697        application_id: ApplicationId,
698        #[debug(skip)]
699        callback: Sender<Result<(), ExecutionError>>,
700    },
701
702    ChangeApplicationPermissions {
703        application_id: ApplicationId,
704        application_permissions: ApplicationPermissions,
705        #[debug(skip)]
706        callback: Sender<Result<(), ExecutionError>>,
707    },
708
709    CreateApplication {
710        chain_id: ChainId,
711        block_height: BlockHeight,
712        module_id: ModuleId,
713        parameters: Vec<u8>,
714        required_application_ids: Vec<ApplicationId>,
715        #[debug(skip)]
716        txn_tracker: TransactionTracker,
717        #[debug(skip)]
718        callback: Sender<Result<CreateApplicationResult, ExecutionError>>,
719    },
720
721    PerformHttpRequest {
722        request: http::Request,
723        http_responses_are_oracle_responses: bool,
724        #[debug(skip)]
725        callback: Sender<http::Response>,
726    },
727
728    ReadBlobContent {
729        blob_id: BlobId,
730        #[debug(skip)]
731        callback: Sender<(BlobContent, bool)>,
732    },
733
734    AssertBlobExists {
735        blob_id: BlobId,
736        #[debug(skip)]
737        callback: Sender<bool>,
738    },
739
740    NextEventIndex {
741        stream_id: StreamId,
742        #[debug(skip)]
743        callback: Sender<u32>,
744    },
745
746    ReadEvent {
747        event_id: EventId,
748        callback: oneshot::Sender<Vec<u8>>,
749    },
750
751    SubscribeToEvents {
752        chain_id: ChainId,
753        stream_id: StreamId,
754        subscriber_app_id: ApplicationId,
755        #[debug(skip)]
756        callback: Sender<u32>,
757    },
758
759    UnsubscribeFromEvents {
760        chain_id: ChainId,
761        stream_id: StreamId,
762        subscriber_app_id: ApplicationId,
763        #[debug(skip)]
764        callback: Sender<()>,
765    },
766
767    GetApplicationPermissions {
768        #[debug(skip)]
769        callback: Sender<ApplicationPermissions>,
770    },
771}