linera_execution/
execution_state_actor.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Handle requests from the synchronous execution thread of user applications.
5
6use std::fmt::{self, Debug, Formatter};
7#[cfg(with_metrics)]
8use std::sync::LazyLock;
9
10use futures::channel::mpsc;
11#[cfg(with_metrics)]
12use linera_base::prometheus_util::{self, MeasureLatency as _};
13use linera_base::{
14    data_types::{Amount, ApplicationPermissions, BlobContent, Timestamp},
15    identifiers::{Account, BlobId, MessageId, Owner},
16    ownership::ChainOwnership,
17};
18use linera_views::{batch::Batch, context::Context, views::View};
19use oneshot::Sender;
20#[cfg(with_metrics)]
21use prometheus::HistogramVec;
22use reqwest::{header::CONTENT_TYPE, Client};
23
24use crate::{
25    system::{OpenChainConfig, Recipient},
26    util::RespondExt,
27    ExecutionError, ExecutionRuntimeContext, ExecutionStateView, RawExecutionOutcome,
28    RawOutgoingMessage, SystemExecutionError, SystemMessage, UserApplicationDescription,
29    UserApplicationId, UserContractCode, UserServiceCode,
30};
31
32#[cfg(with_metrics)]
33/// Histogram of the latency to load a contract bytecode.
34static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
35    prometheus_util::register_histogram_vec(
36        "load_contract_latency",
37        "Load contract latency",
38        &[],
39        Some(vec![
40            0.001, 0.002_5, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 25.0,
41            100.0, 250.0,
42        ]),
43    )
44    .expect("Histogram creation should not fail")
45});
46
47#[cfg(with_metrics)]
48/// Histogram of the latency to load a service bytecode.
49static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
50    prometheus_util::register_histogram_vec(
51        "load_service_latency",
52        "Load service latency",
53        &[],
54        Some(vec![
55            0.001, 0.002_5, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 25.0,
56            100.0, 250.0,
57        ]),
58    )
59    .expect("Histogram creation should not fail")
60});
61
62pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
63
64impl<C> ExecutionStateView<C>
65where
66    C: Context + Clone + Send + Sync + 'static,
67    C::Extra: ExecutionRuntimeContext,
68{
69    // TODO(#1416): Support concurrent I/O.
70    pub(crate) async fn handle_request(
71        &mut self,
72        request: ExecutionRequest,
73    ) -> Result<(), ExecutionError> {
74        use ExecutionRequest::*;
75        match request {
76            LoadContract { id, callback } => {
77                #[cfg(with_metrics)]
78                let _latency = LOAD_CONTRACT_LATENCY.measure_latency();
79                let description = self.system.registry.describe_application(id).await?;
80                let code = self
81                    .context()
82                    .extra()
83                    .get_user_contract(&description)
84                    .await?;
85                callback.respond((code, description));
86            }
87
88            LoadService { id, callback } => {
89                #[cfg(with_metrics)]
90                let _latency = LOAD_SERVICE_LATENCY.measure_latency();
91                let description = self.system.registry.describe_application(id).await?;
92                let code = self
93                    .context()
94                    .extra()
95                    .get_user_service(&description)
96                    .await?;
97                callback.respond((code, description));
98            }
99
100            ChainBalance { callback } => {
101                let balance = *self.system.balance.get();
102                callback.respond(balance);
103            }
104
105            OwnerBalance { owner, callback } => {
106                let balance = self.system.balances.get(&owner).await?.unwrap_or_default();
107                callback.respond(balance);
108            }
109
110            OwnerBalances { callback } => {
111                let mut balances = Vec::new();
112                self.system
113                    .balances
114                    .for_each_index_value(|owner, balance| {
115                        balances.push((owner, balance));
116                        Ok(())
117                    })
118                    .await?;
119                callback.respond(balances);
120            }
121
122            BalanceOwners { callback } => {
123                let owners = self.system.balances.indices().await?;
124                callback.respond(owners);
125            }
126
127            Transfer {
128                source,
129                destination,
130                amount,
131                signer,
132                callback,
133            } => {
134                let mut execution_outcome = RawExecutionOutcome::default();
135                let message = self
136                    .system
137                    .transfer(signer, source, Recipient::Account(destination), amount)
138                    .await?;
139
140                if let Some(message) = message {
141                    execution_outcome.messages.push(message);
142                }
143                callback.respond(execution_outcome);
144            }
145
146            Claim {
147                source,
148                destination,
149                amount,
150                signer,
151                callback,
152            } => {
153                let owner = source.owner.ok_or(ExecutionError::OwnerIsNone)?;
154                let mut execution_outcome = RawExecutionOutcome::default();
155                let message = self
156                    .system
157                    .claim(
158                        signer,
159                        owner,
160                        source.chain_id,
161                        Recipient::Account(destination),
162                        amount,
163                    )
164                    .await?;
165
166                execution_outcome.messages.push(message);
167                callback.respond(execution_outcome);
168            }
169
170            SystemTimestamp { callback } => {
171                let timestamp = *self.system.timestamp.get();
172                callback.respond(timestamp);
173            }
174
175            ChainOwnership { callback } => {
176                let ownership = self.system.ownership.get().clone();
177                callback.respond(ownership);
178            }
179
180            ContainsKey { id, key, callback } => {
181                let view = self.users.try_load_entry(&id).await?;
182                let result = match view {
183                    Some(view) => view.contains_key(&key).await?,
184                    None => false,
185                };
186                callback.respond(result);
187            }
188
189            ContainsKeys { id, keys, callback } => {
190                let view = self.users.try_load_entry(&id).await?;
191                let result = match view {
192                    Some(view) => view.contains_keys(keys).await?,
193                    None => vec![false; keys.len()],
194                };
195                callback.respond(result);
196            }
197
198            ReadMultiValuesBytes { id, keys, callback } => {
199                let view = self.users.try_load_entry(&id).await?;
200                let values = match view {
201                    Some(view) => view.multi_get(keys).await?,
202                    None => vec![None; keys.len()],
203                };
204                callback.respond(values);
205            }
206
207            ReadValueBytes { id, key, callback } => {
208                let view = self.users.try_load_entry(&id).await?;
209                let result = match view {
210                    Some(view) => view.get(&key).await?,
211                    None => None,
212                };
213                callback.respond(result);
214            }
215
216            FindKeysByPrefix {
217                id,
218                key_prefix,
219                callback,
220            } => {
221                let view = self.users.try_load_entry(&id).await?;
222                let result = match view {
223                    Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
224                    None => Vec::new(),
225                };
226                callback.respond(result);
227            }
228
229            FindKeyValuesByPrefix {
230                id,
231                key_prefix,
232                callback,
233            } => {
234                let view = self.users.try_load_entry(&id).await?;
235                let result = match view {
236                    Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
237                    None => Vec::new(),
238                };
239                callback.respond(result);
240            }
241
242            WriteBatch {
243                id,
244                batch,
245                callback,
246            } => {
247                let mut view = self.users.try_load_entry_mut(&id).await?;
248                view.write_batch(batch).await?;
249                callback.respond(());
250            }
251
252            OpenChain {
253                ownership,
254                balance,
255                next_message_id,
256                application_permissions,
257                callback,
258            } => {
259                let inactive_err = || SystemExecutionError::InactiveChain;
260                let config = OpenChainConfig {
261                    ownership,
262                    admin_id: self.system.admin_id.get().ok_or_else(inactive_err)?,
263                    epoch: self.system.epoch.get().ok_or_else(inactive_err)?,
264                    committees: self.system.committees.get().clone(),
265                    balance,
266                    application_permissions,
267                };
268                let messages = self.system.open_chain(config, next_message_id)?;
269                callback.respond(messages)
270            }
271
272            CloseChain {
273                application_id,
274                callback,
275            } => {
276                let app_permissions = self.system.application_permissions.get();
277                if !app_permissions.can_close_chain(&application_id) {
278                    callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
279                } else {
280                    let chain_id = self.context().extra().chain_id();
281                    self.system.close_chain(chain_id).await?;
282                    callback.respond(Ok(()));
283                }
284            }
285
286            FetchUrl { url, callback } => {
287                let bytes = reqwest::get(url).await?.bytes().await?.to_vec();
288                callback.respond(bytes);
289            }
290
291            HttpPost {
292                url,
293                content_type,
294                payload,
295                callback,
296            } => {
297                let res = Client::new()
298                    .post(url)
299                    .body(payload)
300                    .header(CONTENT_TYPE, content_type)
301                    .send()
302                    .await?;
303                let body = res.bytes().await?;
304                let bytes = body.as_ref().to_vec();
305                callback.respond(bytes);
306            }
307
308            ReadBlobContent { blob_id, callback } => {
309                let blob = self
310                    .system
311                    .read_blob_content(blob_id)
312                    .await
313                    .map_err(|_| SystemExecutionError::BlobNotFoundOnRead(blob_id))?;
314                callback.respond(blob);
315            }
316
317            AssertBlobExists { blob_id, callback } => {
318                self.system.assert_blob_exists(blob_id).await?;
319                callback.respond(())
320            }
321        }
322
323        Ok(())
324    }
325}
326
327/// Requests to the execution state.
328pub enum ExecutionRequest {
329    LoadContract {
330        id: UserApplicationId,
331        callback: Sender<(UserContractCode, UserApplicationDescription)>,
332    },
333
334    LoadService {
335        id: UserApplicationId,
336        callback: Sender<(UserServiceCode, UserApplicationDescription)>,
337    },
338
339    ChainBalance {
340        callback: Sender<Amount>,
341    },
342
343    OwnerBalance {
344        owner: Owner,
345        callback: Sender<Amount>,
346    },
347
348    OwnerBalances {
349        callback: Sender<Vec<(Owner, Amount)>>,
350    },
351
352    BalanceOwners {
353        callback: Sender<Vec<Owner>>,
354    },
355
356    Transfer {
357        source: Option<Owner>,
358        destination: Account,
359        amount: Amount,
360        signer: Option<Owner>,
361        callback: Sender<RawExecutionOutcome<SystemMessage, Amount>>,
362    },
363
364    Claim {
365        source: Account,
366        destination: Account,
367        amount: Amount,
368        signer: Option<Owner>,
369        callback: Sender<RawExecutionOutcome<SystemMessage, Amount>>,
370    },
371
372    SystemTimestamp {
373        callback: Sender<Timestamp>,
374    },
375
376    ChainOwnership {
377        callback: Sender<ChainOwnership>,
378    },
379
380    ReadValueBytes {
381        id: UserApplicationId,
382        key: Vec<u8>,
383        callback: Sender<Option<Vec<u8>>>,
384    },
385
386    ContainsKey {
387        id: UserApplicationId,
388        key: Vec<u8>,
389        callback: Sender<bool>,
390    },
391
392    ContainsKeys {
393        id: UserApplicationId,
394        keys: Vec<Vec<u8>>,
395        callback: Sender<Vec<bool>>,
396    },
397
398    ReadMultiValuesBytes {
399        id: UserApplicationId,
400        keys: Vec<Vec<u8>>,
401        callback: Sender<Vec<Option<Vec<u8>>>>,
402    },
403
404    FindKeysByPrefix {
405        id: UserApplicationId,
406        key_prefix: Vec<u8>,
407        callback: Sender<Vec<Vec<u8>>>,
408    },
409
410    FindKeyValuesByPrefix {
411        id: UserApplicationId,
412        key_prefix: Vec<u8>,
413        callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
414    },
415
416    WriteBatch {
417        id: UserApplicationId,
418        batch: Batch,
419        callback: Sender<()>,
420    },
421
422    OpenChain {
423        ownership: ChainOwnership,
424        balance: Amount,
425        next_message_id: MessageId,
426        application_permissions: ApplicationPermissions,
427        callback: Sender<[RawOutgoingMessage<SystemMessage, Amount>; 2]>,
428    },
429
430    CloseChain {
431        application_id: UserApplicationId,
432        callback: oneshot::Sender<Result<(), ExecutionError>>,
433    },
434
435    FetchUrl {
436        url: String,
437        callback: Sender<Vec<u8>>,
438    },
439
440    HttpPost {
441        url: String,
442        content_type: String,
443        payload: Vec<u8>,
444        callback: oneshot::Sender<Vec<u8>>,
445    },
446
447    ReadBlobContent {
448        blob_id: BlobId,
449        callback: Sender<BlobContent>,
450    },
451
452    AssertBlobExists {
453        blob_id: BlobId,
454        callback: Sender<()>,
455    },
456}
457
458impl Debug for ExecutionRequest {
459    fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
460        match self {
461            ExecutionRequest::LoadContract { id, .. } => formatter
462                .debug_struct("ExecutionRequest::LoadContract")
463                .field("id", id)
464                .finish_non_exhaustive(),
465
466            ExecutionRequest::LoadService { id, .. } => formatter
467                .debug_struct("ExecutionRequest::LoadService")
468                .field("id", id)
469                .finish_non_exhaustive(),
470
471            ExecutionRequest::ChainBalance { .. } => formatter
472                .debug_struct("ExecutionRequest::ChainBalance")
473                .finish_non_exhaustive(),
474
475            ExecutionRequest::OwnerBalance { owner, .. } => formatter
476                .debug_struct("ExecutionRequest::OwnerBalance")
477                .field("owner", owner)
478                .finish_non_exhaustive(),
479
480            ExecutionRequest::OwnerBalances { .. } => formatter
481                .debug_struct("ExecutionRequest::OwnerBalances")
482                .finish_non_exhaustive(),
483
484            ExecutionRequest::BalanceOwners { .. } => formatter
485                .debug_struct("ExecutionRequest::BalanceOwners")
486                .finish_non_exhaustive(),
487
488            ExecutionRequest::Transfer {
489                source,
490                destination,
491                amount,
492                signer,
493                ..
494            } => formatter
495                .debug_struct("ExecutionRequest::Transfer")
496                .field("source", source)
497                .field("destination", destination)
498                .field("amount", amount)
499                .field("signer", signer)
500                .finish_non_exhaustive(),
501
502            ExecutionRequest::Claim {
503                source,
504                destination,
505                amount,
506                signer,
507                ..
508            } => formatter
509                .debug_struct("ExecutionRequest::Claim")
510                .field("source", source)
511                .field("destination", destination)
512                .field("amount", amount)
513                .field("signer", signer)
514                .finish_non_exhaustive(),
515
516            ExecutionRequest::SystemTimestamp { .. } => formatter
517                .debug_struct("ExecutionRequest::SystemTimestamp")
518                .finish_non_exhaustive(),
519
520            ExecutionRequest::ChainOwnership { .. } => formatter
521                .debug_struct("ExecutionRequest::ChainOwnership")
522                .finish_non_exhaustive(),
523
524            ExecutionRequest::ReadValueBytes { id, key, .. } => formatter
525                .debug_struct("ExecutionRequest::ReadValueBytes")
526                .field("id", id)
527                .field("key", key)
528                .finish_non_exhaustive(),
529
530            ExecutionRequest::ContainsKey { id, key, .. } => formatter
531                .debug_struct("ExecutionRequest::ContainsKey")
532                .field("id", id)
533                .field("key", key)
534                .finish_non_exhaustive(),
535
536            ExecutionRequest::ContainsKeys { id, keys, .. } => formatter
537                .debug_struct("ExecutionRequest::ContainsKeys")
538                .field("id", id)
539                .field("keys", keys)
540                .finish_non_exhaustive(),
541
542            ExecutionRequest::ReadMultiValuesBytes { id, keys, .. } => formatter
543                .debug_struct("ExecutionRequest::ReadMultiValuesBytes")
544                .field("id", id)
545                .field("keys", keys)
546                .finish_non_exhaustive(),
547
548            ExecutionRequest::FindKeysByPrefix { id, key_prefix, .. } => formatter
549                .debug_struct("ExecutionRequest::FindKeysByPrefix")
550                .field("id", id)
551                .field("key_prefix", key_prefix)
552                .finish_non_exhaustive(),
553
554            ExecutionRequest::FindKeyValuesByPrefix { id, key_prefix, .. } => formatter
555                .debug_struct("ExecutionRequest::FindKeyValuesByPrefix")
556                .field("id", id)
557                .field("key_prefix", key_prefix)
558                .finish_non_exhaustive(),
559
560            ExecutionRequest::WriteBatch { id, batch, .. } => formatter
561                .debug_struct("ExecutionRequest::WriteBatch")
562                .field("id", id)
563                .field("batch", batch)
564                .finish_non_exhaustive(),
565
566            ExecutionRequest::OpenChain { balance, .. } => formatter
567                .debug_struct("ExecutionRequest::OpenChain")
568                .field("balance", balance)
569                .finish_non_exhaustive(),
570
571            ExecutionRequest::CloseChain { application_id, .. } => formatter
572                .debug_struct("ExecutionRequest::CloseChain")
573                .field("application_id", application_id)
574                .finish_non_exhaustive(),
575
576            ExecutionRequest::FetchUrl { url, .. } => formatter
577                .debug_struct("ExecutionRequest::FetchUrl")
578                .field("url", url)
579                .finish_non_exhaustive(),
580
581            ExecutionRequest::HttpPost {
582                url, content_type, ..
583            } => formatter
584                .debug_struct("ExecutionRequest::HttpPost")
585                .field("url", url)
586                .field("content_type", content_type)
587                .finish_non_exhaustive(),
588
589            ExecutionRequest::ReadBlobContent { blob_id, .. } => formatter
590                .debug_struct("ExecutionRequest::ReadBlob")
591                .field("blob_id", blob_id)
592                .finish_non_exhaustive(),
593
594            ExecutionRequest::AssertBlobExists { blob_id, .. } => formatter
595                .debug_struct("ExecutionRequest::AssertBlobExists")
596                .field("blob_id", blob_id)
597                .finish_non_exhaustive(),
598        }
599    }
600}