linera_execution/
system.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(test)]
6#[path = "./unit_tests/system_tests.rs"]
7mod tests;
8
9#[cfg(with_metrics)]
10use std::sync::LazyLock;
11use std::{
12    collections::{BTreeMap, BTreeSet, HashSet},
13    mem,
14};
15
16use custom_debug_derive::Debug;
17use linera_base::{
18    crypto::CryptoHash,
19    data_types::{
20        Amount, ApplicationPermissions, Blob, BlobContent, BlockHeight, OracleResponse, Timestamp,
21    },
22    ensure, hex_debug,
23    identifiers::{
24        Account, AccountOwner, BlobId, BlobType, ChainDescription, ChainId, EventId, MessageId,
25        ModuleId, StreamId,
26    },
27    ownership::{ChainOwnership, TimeoutConfig},
28};
29use linera_views::{
30    context::Context,
31    map_view::{HashedMapView, MapView},
32    register_view::HashedRegisterView,
33    set_view::HashedSetView,
34    views::{ClonableView, HashableView, View, ViewError},
35};
36use serde::{Deserialize, Serialize};
37#[cfg(with_metrics)]
38use {linera_base::prometheus_util::register_int_counter_vec, prometheus::IntCounterVec};
39
40#[cfg(test)]
41use crate::test_utils::SystemExecutionState;
42use crate::{
43    committee::{Committee, Epoch},
44    ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeContext, MessageContext,
45    MessageKind, OperationContext, OutgoingMessage, QueryContext, QueryOutcome, ResourceController,
46    TransactionTracker,
47};
48
49/// The relative index of the `OpenChain` message created by the `OpenChain` operation.
50pub static OPEN_CHAIN_MESSAGE_INDEX: u32 = 0;
51/// The event stream name for new epochs and committees.
52pub static EPOCH_STREAM_NAME: &[u8] = &[0];
53/// The event stream name for removed epochs.
54pub static REMOVED_EPOCH_STREAM_NAME: &[u8] = &[1];
55
56/// The number of times the [`SystemOperation::OpenChain`] was executed.
57#[cfg(with_metrics)]
58static OPEN_CHAIN_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
59    register_int_counter_vec(
60        "open_chain_count",
61        "The number of times the `OpenChain` operation was executed",
62        &[],
63    )
64});
65
66/// A view accessing the execution state of the system of a chain.
67#[derive(Debug, ClonableView, HashableView)]
68pub struct SystemExecutionStateView<C> {
69    /// How the chain was created. May be unknown for inactive chains.
70    pub description: HashedRegisterView<C, Option<ChainDescription>>,
71    /// The number identifying the current configuration.
72    pub epoch: HashedRegisterView<C, Option<Epoch>>,
73    /// The admin of the chain.
74    pub admin_id: HashedRegisterView<C, Option<ChainId>>,
75    /// The committees that we trust, indexed by epoch number.
76    // Not using a `MapView` because the set active of committees is supposed to be
77    // small. Plus, currently, we would create the `BTreeMap` anyway in various places
78    // (e.g. the `OpenChain` operation).
79    pub committees: HashedRegisterView<C, BTreeMap<Epoch, Committee>>,
80    /// Ownership of the chain.
81    pub ownership: HashedRegisterView<C, ChainOwnership>,
82    /// Balance of the chain. (Available to any user able to create blocks in the chain.)
83    pub balance: HashedRegisterView<C, Amount>,
84    /// Balances attributed to a given owner.
85    pub balances: HashedMapView<C, AccountOwner, Amount>,
86    /// The timestamp of the most recent block.
87    pub timestamp: HashedRegisterView<C, Timestamp>,
88    /// Whether this chain has been closed.
89    pub closed: HashedRegisterView<C, bool>,
90    /// Permissions for applications on this chain.
91    pub application_permissions: HashedRegisterView<C, ApplicationPermissions>,
92    /// Blobs that have been used or published on this chain.
93    pub used_blobs: HashedSetView<C, BlobId>,
94    /// The event stream subscriptions of applications on this chain.
95    pub event_subscriptions: MapView<C, (ChainId, StreamId), EventSubscriptions>,
96}
97
98/// The applications subscribing to a particular stream, and the next event index.
99#[derive(Debug, Default, Clone, Serialize, Deserialize)]
100pub struct EventSubscriptions {
101    /// The next event index, i.e. the total number of events in this stream that have already
102    /// been processed by this chain.
103    pub next_index: u32,
104    /// The applications that are subscribed to this stream.
105    pub applications: BTreeSet<ApplicationId>,
106}
107
108/// The configuration for a new chain.
109#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
110pub struct OpenChainConfig {
111    pub ownership: ChainOwnership,
112    pub admin_id: ChainId,
113    pub epoch: Epoch,
114    pub committees: BTreeMap<Epoch, Committee>,
115    pub balance: Amount,
116    pub application_permissions: ApplicationPermissions,
117}
118
119/// A system operation.
120#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
121pub enum SystemOperation {
122    /// Transfers `amount` units of value from the given owner's account to the recipient.
123    /// If no owner is given, try to take the units out of the unattributed account.
124    Transfer {
125        owner: AccountOwner,
126        recipient: Recipient,
127        amount: Amount,
128    },
129    /// Claims `amount` units of value from the given owner's account in the remote
130    /// `target` chain. Depending on its configuration, the `target` chain may refuse to
131    /// process the message.
132    Claim {
133        owner: AccountOwner,
134        target_id: ChainId,
135        recipient: Recipient,
136        amount: Amount,
137    },
138    /// Creates (or activates) a new chain.
139    /// This will automatically subscribe to the future committees created by `admin_id`.
140    OpenChain(OpenChainConfig),
141    /// Closes the chain.
142    CloseChain,
143    /// Changes the ownership of the chain.
144    ChangeOwnership {
145        /// Super owners can propose fast blocks in the first round, and regular blocks in any round.
146        #[debug(skip_if = Vec::is_empty)]
147        super_owners: Vec<AccountOwner>,
148        /// The regular owners, with their weights that determine how often they are round leader.
149        #[debug(skip_if = Vec::is_empty)]
150        owners: Vec<(AccountOwner, u64)>,
151        /// The number of initial rounds after 0 in which all owners are allowed to propose blocks.
152        multi_leader_rounds: u32,
153        /// Whether the multi-leader rounds are unrestricted, i.e. not limited to chain owners.
154        /// This should only be `true` on chains with restrictive application permissions and an
155        /// application-based mechanism to select block proposers.
156        open_multi_leader_rounds: bool,
157        /// The timeout configuration: how long fast, multi-leader and single-leader rounds last.
158        timeout_config: TimeoutConfig,
159    },
160    /// Changes the application permissions configuration on this chain.
161    ChangeApplicationPermissions(ApplicationPermissions),
162    /// Publishes a new application module.
163    PublishModule { module_id: ModuleId },
164    /// Publishes a new data blob.
165    PublishDataBlob { blob_hash: CryptoHash },
166    /// Reads a blob and discards the result.
167    // TODO(#2490): Consider removing this.
168    ReadBlob { blob_id: BlobId },
169    /// Creates a new application.
170    CreateApplication {
171        module_id: ModuleId,
172        #[serde(with = "serde_bytes")]
173        #[debug(with = "hex_debug")]
174        parameters: Vec<u8>,
175        #[serde(with = "serde_bytes")]
176        #[debug(with = "hex_debug", skip_if = Vec::is_empty)]
177        instantiation_argument: Vec<u8>,
178        #[debug(skip_if = Vec::is_empty)]
179        required_application_ids: Vec<ApplicationId>,
180    },
181    /// Operations that are only allowed on the admin chain.
182    Admin(AdminOperation),
183    /// Processes an event about a new epoch and committee.
184    ProcessNewEpoch(Epoch),
185    /// Processes an event about a removed epoch and committee.
186    ProcessRemovedEpoch(Epoch),
187    /// Updates the event stream trackers.
188    UpdateStreams(Vec<(ChainId, StreamId, u32)>),
189}
190
191/// Operations that are only allowed on the admin chain.
192#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
193pub enum AdminOperation {
194    /// Publishes a new committee as a blob. This can be assigned to an epoch using
195    /// [`AdminOperation::CreateCommittee`] in a later block.
196    PublishCommitteeBlob { blob_hash: CryptoHash },
197    /// Registers a new committee. Other chains can then migrate to the new epoch by executing
198    /// [`SystemOperation::ProcessNewEpoch`].
199    CreateCommittee { epoch: Epoch, blob_hash: CryptoHash },
200    /// Removes a committee. Other chains should execute [`SystemOperation::ProcessRemovedEpoch`],
201    /// so that blocks from the retired epoch will not be accepted until they are followed (hence
202    /// re-certified) by a block certified by a recent committee.
203    RemoveCommittee { epoch: Epoch },
204}
205
206/// A system message meant to be executed on a remote chain.
207#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
208pub enum SystemMessage {
209    /// Credits `amount` units of value to the account `target` -- unless the message is
210    /// bouncing, in which case `source` is credited instead.
211    Credit {
212        target: AccountOwner,
213        amount: Amount,
214        source: AccountOwner,
215    },
216    /// Withdraws `amount` units of value from the account and starts a transfer to credit
217    /// the recipient. The message must be properly authenticated. Receiver chains may
218    /// refuse it depending on their configuration.
219    Withdraw {
220        owner: AccountOwner,
221        amount: Amount,
222        recipient: Recipient,
223    },
224    /// Creates (or activates) a new chain.
225    OpenChain(Box<OpenChainConfig>),
226    /// Notifies that a new application was created.
227    ApplicationCreated,
228}
229
230/// A query to the system state.
231#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
232pub struct SystemQuery;
233
234/// The response to a system query.
235#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
236pub struct SystemResponse {
237    pub chain_id: ChainId,
238    pub balance: Amount,
239}
240
241/// The recipient of a transfer.
242#[derive(Debug, PartialEq, Eq, Hash, Copy, Clone, Serialize, Deserialize)]
243pub enum Recipient {
244    /// This is mainly a placeholder for future extensions.
245    Burn,
246    /// Transfers to the balance of the given account.
247    Account(Account),
248}
249
250impl Recipient {
251    /// Returns the default recipient for the given chain (no owner).
252    pub fn chain(chain_id: ChainId) -> Recipient {
253        Recipient::Account(Account::chain(chain_id))
254    }
255
256    /// Returns the default recipient for the root chain with the given index.
257    #[cfg(with_testing)]
258    pub fn root(index: u32) -> Recipient {
259        Recipient::chain(ChainId::root(index))
260    }
261}
262
263/// Optional user message attached to a transfer.
264#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Hash, Default, Debug, Serialize, Deserialize)]
265pub struct UserData(pub Option<[u8; 32]>);
266
267impl UserData {
268    pub fn from_option_string(opt_str: Option<String>) -> Result<Self, usize> {
269        // Convert the Option<String> to Option<[u8; 32]>
270        let option_array = match opt_str {
271            Some(s) => {
272                // Convert the String to a Vec<u8>
273                let vec = s.into_bytes();
274                if vec.len() <= 32 {
275                    // Create an array from the Vec<u8>
276                    let mut array = [b' '; 32];
277
278                    // Copy bytes from the vector into the array
279                    let len = vec.len().min(32);
280                    array[..len].copy_from_slice(&vec[..len]);
281
282                    Some(array)
283                } else {
284                    return Err(vec.len());
285                }
286            }
287            None => None,
288        };
289
290        // Return the UserData with the converted Option<[u8; 32]>
291        Ok(UserData(option_array))
292    }
293}
294
295#[derive(Debug)]
296pub struct CreateApplicationResult {
297    pub app_id: ApplicationId,
298    pub txn_tracker: TransactionTracker,
299}
300
301impl<C> SystemExecutionStateView<C>
302where
303    C: Context + Clone + Send + Sync + 'static,
304    C::Extra: ExecutionRuntimeContext,
305{
306    /// Invariant for the states of active chains.
307    pub fn is_active(&self) -> bool {
308        self.description.get().is_some()
309            && self.ownership.get().is_active()
310            && self.current_committee().is_some()
311            && self.admin_id.get().is_some()
312    }
313
314    /// Returns the current committee, if any.
315    pub fn current_committee(&self) -> Option<(Epoch, &Committee)> {
316        let epoch = self.epoch.get().as_ref()?;
317        let committee = self.committees.get().get(epoch)?;
318        Some((*epoch, committee))
319    }
320
321    /// Executes the sender's side of an operation and returns a list of actions to be
322    /// taken.
323    pub async fn execute_operation(
324        &mut self,
325        context: OperationContext,
326        operation: SystemOperation,
327        txn_tracker: &mut TransactionTracker,
328        resource_controller: &mut ResourceController<Option<AccountOwner>>,
329    ) -> Result<Option<(ApplicationId, Vec<u8>)>, ExecutionError> {
330        use SystemOperation::*;
331        let mut new_application = None;
332        match operation {
333            OpenChain(config) => {
334                let next_message_id = context.next_message_id(txn_tracker.next_message_index());
335                let message = self.open_chain(config, next_message_id).await?;
336                txn_tracker.add_outgoing_message(message)?;
337                #[cfg(with_metrics)]
338                OPEN_CHAIN_COUNT.with_label_values(&[]).inc();
339            }
340            ChangeOwnership {
341                super_owners,
342                owners,
343                multi_leader_rounds,
344                open_multi_leader_rounds,
345                timeout_config,
346            } => {
347                self.ownership.set(ChainOwnership {
348                    super_owners: super_owners.into_iter().collect(),
349                    owners: owners.into_iter().collect(),
350                    multi_leader_rounds,
351                    open_multi_leader_rounds,
352                    timeout_config,
353                });
354            }
355            ChangeApplicationPermissions(application_permissions) => {
356                self.application_permissions.set(application_permissions);
357            }
358            CloseChain => self.close_chain().await?,
359            Transfer {
360                owner,
361                amount,
362                recipient,
363            } => {
364                let maybe_message = self
365                    .transfer(context.authenticated_signer, None, owner, recipient, amount)
366                    .await?;
367                txn_tracker.add_outgoing_messages(maybe_message)?;
368            }
369            Claim {
370                owner,
371                target_id,
372                recipient,
373                amount,
374            } => {
375                let message = self
376                    .claim(
377                        context.authenticated_signer,
378                        None,
379                        owner,
380                        target_id,
381                        recipient,
382                        amount,
383                    )
384                    .await?;
385                txn_tracker.add_outgoing_message(message)?;
386            }
387            Admin(admin_operation) => {
388                ensure!(
389                    *self.admin_id.get() == Some(context.chain_id),
390                    ExecutionError::AdminOperationOnNonAdminChain
391                );
392                match admin_operation {
393                    AdminOperation::PublishCommitteeBlob { blob_hash } => {
394                        self.blob_published(&BlobId::new(blob_hash, BlobType::Committee))?;
395                    }
396                    AdminOperation::CreateCommittee { epoch, blob_hash } => {
397                        self.check_next_epoch(epoch)?;
398                        let blob_id = BlobId::new(blob_hash, BlobType::Committee);
399                        let committee =
400                            bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
401                        self.blob_used(Some(txn_tracker), blob_id).await?;
402                        self.committees.get_mut().insert(epoch, committee);
403                        self.epoch.set(Some(epoch));
404                        txn_tracker.add_event(
405                            StreamId::system(EPOCH_STREAM_NAME),
406                            epoch.0,
407                            bcs::to_bytes(&blob_hash)?,
408                        );
409                    }
410                    AdminOperation::RemoveCommittee { epoch } => {
411                        ensure!(
412                            self.committees.get_mut().remove(&epoch).is_some(),
413                            ExecutionError::InvalidCommitteeRemoval
414                        );
415                        txn_tracker.add_event(
416                            StreamId::system(REMOVED_EPOCH_STREAM_NAME),
417                            epoch.0,
418                            vec![],
419                        );
420                    }
421                }
422            }
423            PublishModule { module_id } => {
424                for blob_id in module_id.bytecode_blob_ids() {
425                    self.blob_published(&blob_id)?;
426                }
427            }
428            CreateApplication {
429                module_id,
430                parameters,
431                instantiation_argument,
432                required_application_ids,
433            } => {
434                let txn_tracker_moved = mem::take(txn_tracker);
435                let CreateApplicationResult {
436                    app_id,
437                    txn_tracker: txn_tracker_moved,
438                } = self
439                    .create_application(
440                        context.chain_id,
441                        context.height,
442                        module_id,
443                        parameters,
444                        required_application_ids,
445                        txn_tracker_moved,
446                    )
447                    .await?;
448                *txn_tracker = txn_tracker_moved;
449                new_application = Some((app_id, instantiation_argument));
450            }
451            PublishDataBlob { blob_hash } => {
452                self.blob_published(&BlobId::new(blob_hash, BlobType::Data))?;
453            }
454            ReadBlob { blob_id } => {
455                let content = self.read_blob_content(blob_id).await?;
456                if blob_id.blob_type == BlobType::Data {
457                    resource_controller
458                        .with_state(self)
459                        .await?
460                        .track_blob_read(content.bytes().len() as u64)?;
461                }
462                self.blob_used(Some(txn_tracker), blob_id).await?;
463            }
464            ProcessNewEpoch(epoch) => {
465                self.check_next_epoch(epoch)?;
466                let admin_id = self
467                    .admin_id
468                    .get()
469                    .ok_or_else(|| ExecutionError::InactiveChain)?;
470                let event_id = EventId {
471                    chain_id: admin_id,
472                    stream_id: StreamId::system(EPOCH_STREAM_NAME),
473                    index: epoch.0,
474                };
475                let bytes = match txn_tracker.next_replayed_oracle_response()? {
476                    None => self.context().extra().get_event(event_id.clone()).await?,
477                    Some(OracleResponse::Event(recorded_event_id, bytes))
478                        if recorded_event_id == event_id =>
479                    {
480                        bytes
481                    }
482                    Some(_) => return Err(ExecutionError::OracleResponseMismatch),
483                };
484                let blob_id = BlobId::new(bcs::from_bytes(&bytes)?, BlobType::Committee);
485                txn_tracker.add_oracle_response(OracleResponse::Event(event_id, bytes));
486                let committee = bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
487                self.blob_used(Some(txn_tracker), blob_id).await?;
488                self.committees.get_mut().insert(epoch, committee);
489                self.epoch.set(Some(epoch));
490            }
491            ProcessRemovedEpoch(epoch) => {
492                ensure!(
493                    self.committees.get_mut().remove(&epoch).is_some(),
494                    ExecutionError::InvalidCommitteeRemoval
495                );
496                let admin_id = self
497                    .admin_id
498                    .get()
499                    .ok_or_else(|| ExecutionError::InactiveChain)?;
500                let event_id = EventId {
501                    chain_id: admin_id,
502                    stream_id: StreamId::system(REMOVED_EPOCH_STREAM_NAME),
503                    index: epoch.0,
504                };
505                let bytes = match txn_tracker.next_replayed_oracle_response()? {
506                    None => self.context().extra().get_event(event_id.clone()).await?,
507                    Some(OracleResponse::Event(recorded_event_id, bytes))
508                        if recorded_event_id == event_id =>
509                    {
510                        bytes
511                    }
512                    Some(_) => return Err(ExecutionError::OracleResponseMismatch),
513                };
514                txn_tracker.add_oracle_response(OracleResponse::Event(event_id, bytes));
515            }
516            UpdateStreams(streams) => {
517                for (chain_id, stream_id, next_index) in streams {
518                    let subscriptions = self
519                        .event_subscriptions
520                        .get_mut_or_default(&(chain_id, stream_id))
521                        .await?;
522                    subscriptions.next_index = subscriptions.next_index.max(next_index);
523                }
524            }
525        }
526
527        Ok(new_application)
528    }
529
530    /// Returns an error if the `provided` epoch is not exactly one higher than the chain's current
531    /// epoch.
532    fn check_next_epoch(&self, provided: Epoch) -> Result<(), ExecutionError> {
533        let expected = self.epoch.get().expect("chain is active").try_add_one()?;
534        ensure!(
535            provided == expected,
536            ExecutionError::InvalidCommitteeEpoch { provided, expected }
537        );
538        Ok(())
539    }
540
541    pub async fn transfer(
542        &mut self,
543        authenticated_signer: Option<AccountOwner>,
544        authenticated_application_id: Option<ApplicationId>,
545        source: AccountOwner,
546        recipient: Recipient,
547        amount: Amount,
548    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
549        if source == AccountOwner::CHAIN {
550            ensure!(
551                authenticated_signer.is_some()
552                    && self
553                        .ownership
554                        .get()
555                        .verify_owner(&authenticated_signer.unwrap()),
556                ExecutionError::UnauthenticatedTransferOwner
557            );
558        } else {
559            ensure!(
560                authenticated_signer == Some(source)
561                    || authenticated_application_id.map(AccountOwner::from) == Some(source),
562                ExecutionError::UnauthenticatedTransferOwner
563            );
564        }
565        ensure!(
566            amount > Amount::ZERO,
567            ExecutionError::IncorrectTransferAmount
568        );
569        self.debit(&source, amount).await?;
570        match recipient {
571            Recipient::Account(account) => {
572                let message = SystemMessage::Credit {
573                    amount,
574                    source,
575                    target: account.owner,
576                };
577                Ok(Some(
578                    OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
579                ))
580            }
581            Recipient::Burn => Ok(None),
582        }
583    }
584
585    pub async fn claim(
586        &self,
587        authenticated_signer: Option<AccountOwner>,
588        authenticated_application_id: Option<ApplicationId>,
589        source: AccountOwner,
590        target_id: ChainId,
591        recipient: Recipient,
592        amount: Amount,
593    ) -> Result<OutgoingMessage, ExecutionError> {
594        ensure!(
595            authenticated_signer == Some(source)
596                || authenticated_application_id.map(AccountOwner::from) == Some(source),
597            ExecutionError::UnauthenticatedClaimOwner
598        );
599        ensure!(amount > Amount::ZERO, ExecutionError::IncorrectClaimAmount);
600
601        let message = SystemMessage::Withdraw {
602            amount,
603            owner: source,
604            recipient,
605        };
606        Ok(
607            OutgoingMessage::new(target_id, message)
608                .with_authenticated_signer(authenticated_signer),
609        )
610    }
611
612    /// Debits an [`Amount`] of tokens from an account's balance.
613    async fn debit(
614        &mut self,
615        account: &AccountOwner,
616        amount: Amount,
617    ) -> Result<(), ExecutionError> {
618        let balance = if account == &AccountOwner::CHAIN {
619            self.balance.get_mut()
620        } else {
621            self.balances.get_mut(account).await?.ok_or_else(|| {
622                ExecutionError::InsufficientFunding {
623                    balance: Amount::ZERO,
624                    account: *account,
625                }
626            })?
627        };
628
629        balance
630            .try_sub_assign(amount)
631            .map_err(|_| ExecutionError::InsufficientFunding {
632                balance: *balance,
633                account: *account,
634            })?;
635
636        if account != &AccountOwner::CHAIN && balance.is_zero() {
637            self.balances.remove(account)?;
638        }
639
640        Ok(())
641    }
642
643    /// Executes a cross-chain message that represents the recipient's side of an operation.
644    pub async fn execute_message(
645        &mut self,
646        context: MessageContext,
647        message: SystemMessage,
648    ) -> Result<Vec<OutgoingMessage>, ExecutionError> {
649        let mut outcome = Vec::new();
650        use SystemMessage::*;
651        match message {
652            Credit {
653                amount,
654                source,
655                target,
656            } => {
657                let receiver = if context.is_bouncing { source } else { target };
658                if receiver == AccountOwner::CHAIN {
659                    let new_balance = self.balance.get().saturating_add(amount);
660                    self.balance.set(new_balance);
661                } else {
662                    let balance = self.balances.get_mut_or_default(&receiver).await?;
663                    *balance = balance.saturating_add(amount);
664                }
665            }
666            Withdraw {
667                amount,
668                owner,
669                recipient,
670            } => {
671                self.debit(&owner, amount).await?;
672                match recipient {
673                    Recipient::Account(account) => {
674                        let message = SystemMessage::Credit {
675                            amount,
676                            source: owner,
677                            target: account.owner,
678                        };
679                        outcome.push(
680                            OutgoingMessage::new(account.chain_id, message)
681                                .with_kind(MessageKind::Tracked),
682                        );
683                    }
684                    Recipient::Burn => (),
685                }
686            }
687            // These messages are executed immediately when cross-chain requests are received.
688            OpenChain(_) => {}
689            // This message is only a placeholder: Its ID is part of the application ID.
690            ApplicationCreated => {}
691        }
692        Ok(outcome)
693    }
694
695    /// Initializes the system application state on a newly opened chain.
696    pub fn initialize_chain(
697        &mut self,
698        message_id: MessageId,
699        timestamp: Timestamp,
700        config: OpenChainConfig,
701    ) {
702        // Guaranteed under BFT assumptions.
703        assert!(self.description.get().is_none());
704        assert!(!self.ownership.get().is_active());
705        assert!(self.committees.get().is_empty());
706        let OpenChainConfig {
707            ownership,
708            admin_id,
709            epoch,
710            committees,
711            balance,
712            application_permissions,
713        } = config;
714        let description = ChainDescription::Child(message_id);
715        self.description.set(Some(description));
716        self.epoch.set(Some(epoch));
717        self.committees.set(committees);
718        self.admin_id.set(Some(admin_id));
719        self.ownership.set(ownership);
720        self.timestamp.set(timestamp);
721        self.balance.set(balance);
722        self.application_permissions.set(application_permissions);
723    }
724
725    pub async fn handle_query(
726        &mut self,
727        context: QueryContext,
728        _query: SystemQuery,
729    ) -> Result<QueryOutcome<SystemResponse>, ExecutionError> {
730        let response = SystemResponse {
731            chain_id: context.chain_id,
732            balance: *self.balance.get(),
733        };
734        Ok(QueryOutcome {
735            response,
736            operations: vec![],
737        })
738    }
739
740    /// Returns the messages to open a new chain, and subtracts the new chain's balance
741    /// from this chain's.
742    pub async fn open_chain(
743        &mut self,
744        config: OpenChainConfig,
745        next_message_id: MessageId,
746    ) -> Result<OutgoingMessage, ExecutionError> {
747        let child_id = ChainId::child(next_message_id);
748        ensure!(
749            self.admin_id.get().as_ref() == Some(&config.admin_id),
750            ExecutionError::InvalidNewChainAdminId(child_id)
751        );
752        ensure!(
753            self.committees.get() == &config.committees,
754            ExecutionError::InvalidCommittees
755        );
756        ensure!(
757            self.epoch.get().as_ref() == Some(&config.epoch),
758            ExecutionError::InvalidEpoch {
759                chain_id: child_id,
760                epoch: config.epoch,
761            }
762        );
763        self.debit(&AccountOwner::CHAIN, config.balance).await?;
764        let message = SystemMessage::OpenChain(Box::new(config));
765        Ok(OutgoingMessage::new(child_id, message).with_kind(MessageKind::Protected))
766    }
767
768    pub async fn close_chain(&mut self) -> Result<(), ExecutionError> {
769        self.closed.set(true);
770        Ok(())
771    }
772
773    pub async fn create_application(
774        &mut self,
775        chain_id: ChainId,
776        block_height: BlockHeight,
777        module_id: ModuleId,
778        parameters: Vec<u8>,
779        required_application_ids: Vec<ApplicationId>,
780        mut txn_tracker: TransactionTracker,
781    ) -> Result<CreateApplicationResult, ExecutionError> {
782        let application_index = txn_tracker.next_application_index();
783
784        let blob_ids = self.check_bytecode_blobs(&module_id).await?;
785        // We only remember to register the blobs that aren't recorded in `used_blobs`
786        // already.
787        for blob_id in blob_ids {
788            self.blob_used(Some(&mut txn_tracker), blob_id).await?;
789        }
790
791        let application_description = ApplicationDescription {
792            module_id,
793            creator_chain_id: chain_id,
794            block_height,
795            application_index,
796            parameters,
797            required_application_ids,
798        };
799        self.check_required_applications(&application_description, Some(&mut txn_tracker))
800            .await?;
801
802        txn_tracker.add_created_blob(Blob::new_application_description(&application_description));
803
804        Ok(CreateApplicationResult {
805            app_id: ApplicationId::from(&application_description),
806            txn_tracker,
807        })
808    }
809
810    async fn check_required_applications(
811        &mut self,
812        application_description: &ApplicationDescription,
813        mut txn_tracker: Option<&mut TransactionTracker>,
814    ) -> Result<(), ExecutionError> {
815        // Make sure that referenced applications IDs have been registered.
816        for required_id in &application_description.required_application_ids {
817            Box::pin(self.describe_application(*required_id, txn_tracker.as_deref_mut())).await?;
818        }
819        Ok(())
820    }
821
822    /// Retrieves an application's description.
823    pub async fn describe_application(
824        &mut self,
825        id: ApplicationId,
826        mut txn_tracker: Option<&mut TransactionTracker>,
827    ) -> Result<ApplicationDescription, ExecutionError> {
828        let blob_id = id.description_blob_id();
829        let blob_content = match txn_tracker
830            .as_ref()
831            .and_then(|tracker| tracker.created_blobs().get(&blob_id))
832        {
833            Some(blob) => blob.content().clone(),
834            None => self.read_blob_content(blob_id).await?,
835        };
836        self.blob_used(txn_tracker.as_deref_mut(), blob_id).await?;
837        let description: ApplicationDescription = bcs::from_bytes(blob_content.bytes())?;
838
839        let blob_ids = self.check_bytecode_blobs(&description.module_id).await?;
840        // We only remember to register the blobs that aren't recorded in `used_blobs`
841        // already.
842        for blob_id in blob_ids {
843            self.blob_used(txn_tracker.as_deref_mut(), blob_id).await?;
844        }
845
846        self.check_required_applications(&description, txn_tracker)
847            .await?;
848
849        Ok(description)
850    }
851
852    /// Retrieves the recursive dependencies of applications and applies a topological sort.
853    pub async fn find_dependencies(
854        &mut self,
855        mut stack: Vec<ApplicationId>,
856        txn_tracker: &mut TransactionTracker,
857    ) -> Result<Vec<ApplicationId>, ExecutionError> {
858        // What we return at the end.
859        let mut result = Vec::new();
860        // The entries already inserted in `result`.
861        let mut sorted = HashSet::new();
862        // The entries for which dependencies have already been pushed once to the stack.
863        let mut seen = HashSet::new();
864
865        while let Some(id) = stack.pop() {
866            if sorted.contains(&id) {
867                continue;
868            }
869            if seen.contains(&id) {
870                // Second time we see this entry. It was last pushed just before its
871                // dependencies -- which are now fully sorted.
872                sorted.insert(id);
873                result.push(id);
874                continue;
875            }
876            // First time we see this entry:
877            // 1. Mark it so that its dependencies are no longer pushed to the stack.
878            seen.insert(id);
879            // 2. Schedule all the (yet unseen) dependencies, then this entry for a second visit.
880            stack.push(id);
881            let app = self.describe_application(id, Some(txn_tracker)).await?;
882            for child in app.required_application_ids.iter().rev() {
883                if !seen.contains(child) {
884                    stack.push(*child);
885                }
886            }
887        }
888        Ok(result)
889    }
890
891    /// Records a blob that is used in this block. If this is the first use on this chain, creates
892    /// an oracle response for it.
893    pub(crate) async fn blob_used(
894        &mut self,
895        maybe_txn_tracker: Option<&mut TransactionTracker>,
896        blob_id: BlobId,
897    ) -> Result<bool, ExecutionError> {
898        if self.used_blobs.contains(&blob_id).await? {
899            return Ok(false); // Nothing to do.
900        }
901        self.used_blobs.insert(&blob_id)?;
902        if let Some(txn_tracker) = maybe_txn_tracker {
903            txn_tracker.replay_oracle_response(OracleResponse::Blob(blob_id))?;
904        }
905        Ok(true)
906    }
907
908    /// Records a blob that is published in this block. This does not create an oracle entry, and
909    /// the blob can be used without using an oracle in the future on this chain.
910    fn blob_published(&mut self, blob_id: &BlobId) -> Result<(), ExecutionError> {
911        self.used_blobs.insert(blob_id)?;
912        Ok(())
913    }
914
915    pub async fn read_blob_content(&self, blob_id: BlobId) -> Result<BlobContent, ExecutionError> {
916        match self.context().extra().get_blob(blob_id).await {
917            Ok(blob) => Ok(blob.into()),
918            Err(ViewError::BlobsNotFound(_)) => Err(ExecutionError::BlobsNotFound(vec![blob_id])),
919            Err(error) => Err(error.into()),
920        }
921    }
922
923    pub async fn assert_blob_exists(&mut self, blob_id: BlobId) -> Result<(), ExecutionError> {
924        if self.context().extra().contains_blob(blob_id).await? {
925            Ok(())
926        } else {
927            Err(ExecutionError::BlobsNotFound(vec![blob_id]))
928        }
929    }
930
931    async fn check_bytecode_blobs(
932        &mut self,
933        module_id: &ModuleId,
934    ) -> Result<Vec<BlobId>, ExecutionError> {
935        let blob_ids = module_id.bytecode_blob_ids();
936
937        let mut missing_blobs = Vec::new();
938        for blob_id in &blob_ids {
939            if !self.context().extra().contains_blob(*blob_id).await? {
940                missing_blobs.push(*blob_id);
941            }
942        }
943        ensure!(
944            missing_blobs.is_empty(),
945            ExecutionError::BlobsNotFound(missing_blobs)
946        );
947
948        Ok(blob_ids)
949    }
950}