Skip to main content

linera_execution/
system.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(test)]
6#[path = "./unit_tests/system_tests.rs"]
7mod tests;
8
9use std::{
10    collections::{BTreeMap, BTreeSet, HashSet},
11    sync::Arc,
12};
13
14use allocative::Allocative;
15use custom_debug_derive::Debug;
16use linera_base::{
17    crypto::CryptoHash,
18    data_types::{
19        Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
20        ChainDescription, ChainOrigin, Epoch, InitialChainConfig, OracleResponse, Timestamp,
21    },
22    ensure, hex_debug,
23    identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, ModuleId, StreamId},
24    ownership::{ChainOwnership, TimeoutConfig},
25};
26use linera_views::{
27    context::Context,
28    lazy_register_view::HashedLazyRegisterView,
29    map_view::HashedMapView,
30    register_view::HashedRegisterView,
31    set_view::HashedSetView,
32    views::{ClonableView, HashableView, ReplaceContext, View},
33    ViewError,
34};
35use serde::{Deserialize, Serialize};
36
37#[cfg(test)]
38use crate::test_utils::SystemExecutionState;
39use crate::{
40    committee::Committee, util::OracleResponseExt as _, ApplicationDescription, ApplicationId,
41    ExecutionError, ExecutionRuntimeContext, MessageContext, MessageKind, OperationContext,
42    OutgoingMessage, QueryContext, QueryOutcome, ResourceController, TransactionTracker,
43};
44
45/// The event stream name for new epochs and committees.
46pub static EPOCH_STREAM_NAME: &[u8] = &[0];
47/// The event stream name for removed epochs.
48pub static REMOVED_EPOCH_STREAM_NAME: &[u8] = &[1];
49
50/// The number of times the [`SystemOperation::OpenChain`] was executed.
51#[cfg(with_metrics)]
52mod metrics {
53    use std::sync::LazyLock;
54
55    use linera_base::prometheus_util::register_int_counter_vec;
56    use prometheus::IntCounterVec;
57
58    pub static OPEN_CHAIN_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
59        register_int_counter_vec(
60            "open_chain_count",
61            "The number of times the `OpenChain` operation was executed",
62            &[],
63        )
64    });
65}
66
67/// A view accessing the execution state of the system of a chain.
68#[derive(Debug, ClonableView, HashableView, Allocative)]
69#[allocative(bound = "C")]
70pub struct SystemExecutionStateView<C> {
71    /// How the chain was created. May be unknown for inactive chains.
72    pub description: HashedLazyRegisterView<C, Option<ChainDescription>>,
73    /// The number identifying the current configuration.
74    pub epoch: HashedRegisterView<C, Epoch>,
75    /// The admin of the chain.
76    pub admin_chain_id: HashedRegisterView<C, Option<ChainId>>,
77    /// The committees that we trust, indexed by epoch number.
78    // Not using a `MapView` because the set active of committees is supposed to be
79    // small. Plus, currently, we would create the `BTreeMap` anyway in various places
80    // (e.g. the `OpenChain` operation).
81    pub committees: HashedRegisterView<C, BTreeMap<Epoch, Committee>>,
82    /// Ownership of the chain.
83    pub ownership: HashedLazyRegisterView<C, ChainOwnership>,
84    /// Balance of the chain. (Available to any user able to create blocks in the chain.)
85    pub balance: HashedRegisterView<C, Amount>,
86    /// Balances attributed to a given owner.
87    pub balances: HashedMapView<C, AccountOwner, Amount>,
88    /// The timestamp of the most recent block.
89    pub timestamp: HashedRegisterView<C, Timestamp>,
90    /// Whether this chain has been closed.
91    pub closed: HashedRegisterView<C, bool>,
92    /// Permissions for applications on this chain.
93    pub application_permissions: HashedLazyRegisterView<C, ApplicationPermissions>,
94    /// Blobs that have been used or published on this chain.
95    pub used_blobs: HashedSetView<C, BlobId>,
96    /// The event stream subscriptions of applications on this chain.
97    pub event_subscriptions: HashedMapView<C, (ChainId, StreamId), EventSubscriptions>,
98}
99
100impl<C: Context, C2: Context> ReplaceContext<C2> for SystemExecutionStateView<C> {
101    type Target = SystemExecutionStateView<C2>;
102
103    async fn with_context(
104        &mut self,
105        ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
106    ) -> Self::Target {
107        SystemExecutionStateView {
108            description: self.description.with_context(ctx.clone()).await,
109            epoch: self.epoch.with_context(ctx.clone()).await,
110            admin_chain_id: self.admin_chain_id.with_context(ctx.clone()).await,
111            committees: self.committees.with_context(ctx.clone()).await,
112            ownership: self.ownership.with_context(ctx.clone()).await,
113            balance: self.balance.with_context(ctx.clone()).await,
114            balances: self.balances.with_context(ctx.clone()).await,
115            timestamp: self.timestamp.with_context(ctx.clone()).await,
116            closed: self.closed.with_context(ctx.clone()).await,
117            application_permissions: self.application_permissions.with_context(ctx.clone()).await,
118            used_blobs: self.used_blobs.with_context(ctx.clone()).await,
119            event_subscriptions: self.event_subscriptions.with_context(ctx.clone()).await,
120        }
121    }
122}
123
124/// The applications subscribing to a particular stream, and the next event index.
125#[derive(Debug, Default, Clone, Serialize, Deserialize, Allocative)]
126pub struct EventSubscriptions {
127    /// The next event index, i.e. the total number of events in this stream that have already
128    /// been processed by this chain.
129    pub next_index: u32,
130    /// The applications that are subscribed to this stream.
131    pub applications: BTreeSet<ApplicationId>,
132}
133
134/// The initial configuration for a new chain.
135#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
136pub struct OpenChainConfig {
137    /// The ownership configuration of the new chain.
138    pub ownership: ChainOwnership,
139    /// The initial chain balance.
140    pub balance: Amount,
141    /// The initial application permissions.
142    pub application_permissions: ApplicationPermissions,
143}
144
145impl OpenChainConfig {
146    /// Creates an [`InitialChainConfig`] based on this [`OpenChainConfig`] and additional
147    /// parameters.
148    pub fn init_chain_config(
149        &self,
150        epoch: Epoch,
151        min_active_epoch: Epoch,
152        max_active_epoch: Epoch,
153    ) -> InitialChainConfig {
154        InitialChainConfig {
155            application_permissions: self.application_permissions.clone(),
156            balance: self.balance,
157            epoch,
158            min_active_epoch,
159            max_active_epoch,
160            ownership: self.ownership.clone(),
161        }
162    }
163}
164
165/// A system operation.
166#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
167pub enum SystemOperation {
168    /// Transfers `amount` units of value from the given owner's account to the recipient.
169    /// If no owner is given, try to take the units out of the unattributed account.
170    Transfer {
171        owner: AccountOwner,
172        recipient: Account,
173        amount: Amount,
174    },
175    /// Claims `amount` units of value from the given owner's account in the remote
176    /// `target` chain. Depending on its configuration, the `target` chain may refuse to
177    /// process the message.
178    Claim {
179        owner: AccountOwner,
180        target_id: ChainId,
181        recipient: Account,
182        amount: Amount,
183    },
184    /// Creates (or activates) a new chain.
185    /// This will automatically subscribe to the future committees created by `admin_chain_id`.
186    OpenChain(OpenChainConfig),
187    /// Closes the chain.
188    CloseChain,
189    /// Changes the ownership of the chain.
190    ChangeOwnership {
191        /// Super owners can propose fast blocks in the first round, and regular blocks in any round.
192        #[debug(skip_if = Vec::is_empty)]
193        super_owners: Vec<AccountOwner>,
194        /// The regular owners, with their weights that determine how often they are round leader.
195        #[debug(skip_if = Vec::is_empty)]
196        owners: Vec<(AccountOwner, u64)>,
197        /// The number of initial rounds after 0 in which all owners are allowed to propose blocks.
198        multi_leader_rounds: u32,
199        /// Whether the multi-leader rounds are unrestricted, i.e. not limited to chain owners.
200        /// This should only be `true` on chains with restrictive application permissions and an
201        /// application-based mechanism to select block proposers.
202        open_multi_leader_rounds: bool,
203        /// The timeout configuration: how long fast, multi-leader and single-leader rounds last.
204        timeout_config: TimeoutConfig,
205    },
206    /// Changes the application permissions configuration on this chain.
207    ChangeApplicationPermissions(ApplicationPermissions),
208    /// Publishes a new application module.
209    PublishModule { module_id: ModuleId },
210    /// Publishes a new data blob.
211    PublishDataBlob { blob_hash: CryptoHash },
212    /// Verifies that the given blob exists. Otherwise the block fails.
213    VerifyBlob { blob_id: BlobId },
214    /// Creates a new application.
215    CreateApplication {
216        module_id: ModuleId,
217        #[serde(with = "serde_bytes")]
218        #[debug(with = "hex_debug")]
219        parameters: Vec<u8>,
220        #[serde(with = "serde_bytes")]
221        #[debug(with = "hex_debug", skip_if = Vec::is_empty)]
222        instantiation_argument: Vec<u8>,
223        #[debug(skip_if = Vec::is_empty)]
224        required_application_ids: Vec<ApplicationId>,
225    },
226    /// Operations that are only allowed on the admin chain.
227    Admin(AdminOperation),
228    /// Processes an event about a new epoch and committee.
229    ProcessNewEpoch(Epoch),
230    /// Processes an event about a removed epoch and committee.
231    ProcessRemovedEpoch(Epoch),
232    /// Updates the event stream trackers.
233    UpdateStreams(Vec<(ChainId, StreamId, u32)>),
234}
235
236/// Operations that are only allowed on the admin chain.
237#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
238pub enum AdminOperation {
239    /// Publishes a new committee as a blob. This can be assigned to an epoch using
240    /// [`AdminOperation::CreateCommittee`] in a later block.
241    PublishCommitteeBlob { blob_hash: CryptoHash },
242    /// Registers a new committee. Other chains can then migrate to the new epoch by executing
243    /// [`SystemOperation::ProcessNewEpoch`].
244    CreateCommittee { epoch: Epoch, blob_hash: CryptoHash },
245    /// Removes a committee. Other chains should execute [`SystemOperation::ProcessRemovedEpoch`],
246    /// so that blocks from the retired epoch will not be accepted until they are followed (hence
247    /// re-certified) by a block certified by a recent committee.
248    RemoveCommittee { epoch: Epoch },
249}
250
251/// A system message meant to be executed on a remote chain.
252#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
253pub enum SystemMessage {
254    /// Credits `amount` units of value to the account `target` -- unless the message is
255    /// bouncing, in which case `source` is credited instead.
256    Credit {
257        target: AccountOwner,
258        amount: Amount,
259        source: AccountOwner,
260    },
261    /// Withdraws `amount` units of value from the account and starts a transfer to credit
262    /// the recipient. The message must be properly authenticated. Receiver chains may
263    /// refuse it depending on their configuration.
264    Withdraw {
265        owner: AccountOwner,
266        amount: Amount,
267        recipient: Account,
268    },
269}
270
271/// A query to the system state.
272#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
273pub struct SystemQuery;
274
275/// The response to a system query.
276#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
277pub struct SystemResponse {
278    pub chain_id: ChainId,
279    pub balance: Amount,
280}
281
282/// Optional user message attached to a transfer.
283#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Hash, Default, Debug, Serialize, Deserialize)]
284pub struct UserData(pub Option<[u8; 32]>);
285
286impl UserData {
287    pub fn from_option_string(opt_str: Option<String>) -> Result<Self, usize> {
288        // Convert the Option<String> to Option<[u8; 32]>
289        let option_array = match opt_str {
290            Some(s) => {
291                // Convert the String to a Vec<u8>
292                let vec = s.into_bytes();
293                if vec.len() <= 32 {
294                    // Create an array from the Vec<u8>
295                    let mut array = [b' '; 32];
296
297                    // Copy bytes from the vector into the array
298                    let len = vec.len().min(32);
299                    array[..len].copy_from_slice(&vec[..len]);
300
301                    Some(array)
302                } else {
303                    return Err(vec.len());
304                }
305            }
306            None => None,
307        };
308
309        // Return the UserData with the converted Option<[u8; 32]>
310        Ok(UserData(option_array))
311    }
312}
313
314#[derive(Debug)]
315pub struct CreateApplicationResult {
316    pub app_id: ApplicationId,
317}
318
319impl<C> SystemExecutionStateView<C>
320where
321    C: Context + Clone + 'static,
322    C::Extra: ExecutionRuntimeContext,
323{
324    /// Invariant for the states of active chains.
325    pub async fn is_active(&self) -> Result<bool, ViewError> {
326        Ok(self.description.get().await?.is_some()
327            && self.ownership.get().await?.is_active()
328            && self.current_committee().is_some()
329            && self.admin_chain_id.get().is_some())
330    }
331
332    /// Returns the current committee, if any.
333    pub fn current_committee(&self) -> Option<(Epoch, &Committee)> {
334        let epoch = self.epoch.get();
335        let committee = self.committees.get().get(epoch)?;
336        Some((*epoch, committee))
337    }
338
339    async fn get_event(&self, event_id: EventId) -> Result<Arc<Vec<u8>>, ExecutionError> {
340        match self.context().extra().get_event(event_id.clone()).await? {
341            None => Err(ExecutionError::EventsNotFound(vec![event_id])),
342            Some(vec) => Ok(vec),
343        }
344    }
345
346    /// Executes the sender's side of an operation and returns a list of actions to be
347    /// taken.
348    pub async fn execute_operation(
349        &mut self,
350        context: OperationContext,
351        operation: SystemOperation,
352        txn_tracker: &mut TransactionTracker,
353        resource_controller: &mut ResourceController<Option<AccountOwner>>,
354    ) -> Result<Option<(ApplicationId, Vec<u8>)>, ExecutionError> {
355        use SystemOperation::*;
356        let mut new_application = None;
357        match operation {
358            OpenChain(config) => {
359                let _chain_id = self
360                    .open_chain(
361                        config,
362                        context.chain_id,
363                        context.height,
364                        context.timestamp,
365                        txn_tracker,
366                    )
367                    .await?;
368                #[cfg(with_metrics)]
369                metrics::OPEN_CHAIN_COUNT.with_label_values(&[]).inc();
370            }
371            ChangeOwnership {
372                super_owners,
373                owners,
374                multi_leader_rounds,
375                open_multi_leader_rounds,
376                timeout_config,
377            } => {
378                self.ownership.set(ChainOwnership {
379                    super_owners: super_owners.into_iter().collect(),
380                    owners: owners.into_iter().collect(),
381                    multi_leader_rounds,
382                    open_multi_leader_rounds,
383                    timeout_config,
384                });
385            }
386            ChangeApplicationPermissions(application_permissions) => {
387                self.application_permissions.set(application_permissions);
388            }
389            CloseChain => self.close_chain()?,
390            Transfer {
391                owner,
392                amount,
393                recipient,
394            } => {
395                let maybe_message = self
396                    .transfer(context.authenticated_signer, None, owner, recipient, amount)
397                    .await?;
398                txn_tracker.add_outgoing_messages(maybe_message);
399            }
400            Claim {
401                owner,
402                target_id,
403                recipient,
404                amount,
405            } => {
406                let maybe_message = self
407                    .claim(
408                        context.authenticated_signer,
409                        None,
410                        owner,
411                        target_id,
412                        recipient,
413                        amount,
414                    )
415                    .await?;
416                txn_tracker.add_outgoing_messages(maybe_message);
417            }
418            Admin(admin_operation) => {
419                ensure!(
420                    *self.admin_chain_id.get() == Some(context.chain_id),
421                    ExecutionError::AdminOperationOnNonAdminChain
422                );
423                match admin_operation {
424                    AdminOperation::PublishCommitteeBlob { blob_hash } => {
425                        self.blob_published(
426                            &BlobId::new(blob_hash, BlobType::Committee),
427                            txn_tracker,
428                        )?;
429                    }
430                    AdminOperation::CreateCommittee { epoch, blob_hash } => {
431                        self.check_next_epoch(epoch)?;
432                        let blob_id = BlobId::new(blob_hash, BlobType::Committee);
433                        let committee =
434                            bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
435                        self.blob_used(txn_tracker, blob_id).await?;
436                        self.committees.get_mut().insert(epoch, committee);
437                        self.epoch.set(epoch);
438                        txn_tracker.add_event(
439                            StreamId::system(EPOCH_STREAM_NAME),
440                            epoch.0,
441                            bcs::to_bytes(&blob_hash)?,
442                        );
443                    }
444                    AdminOperation::RemoveCommittee { epoch } => {
445                        ensure!(
446                            self.committees.get_mut().remove(&epoch).is_some(),
447                            ExecutionError::InvalidCommitteeRemoval
448                        );
449                        txn_tracker.add_event(
450                            StreamId::system(REMOVED_EPOCH_STREAM_NAME),
451                            epoch.0,
452                            vec![],
453                        );
454                    }
455                }
456            }
457            PublishModule { module_id } => {
458                for blob_id in module_id.bytecode_blob_ids() {
459                    self.blob_published(&blob_id, txn_tracker)?;
460                }
461            }
462            CreateApplication {
463                module_id,
464                parameters,
465                instantiation_argument,
466                required_application_ids,
467            } => {
468                let CreateApplicationResult { app_id } = self
469                    .create_application(
470                        context.chain_id,
471                        context.height,
472                        module_id,
473                        parameters,
474                        required_application_ids,
475                        txn_tracker,
476                    )
477                    .await?;
478                new_application = Some((app_id, instantiation_argument));
479            }
480            PublishDataBlob { blob_hash } => {
481                self.blob_published(&BlobId::new(blob_hash, BlobType::Data), txn_tracker)?;
482            }
483            VerifyBlob { blob_id } => {
484                self.assert_blob_exists(blob_id).await?;
485                resource_controller
486                    .with_state(self)
487                    .await?
488                    .track_blob_read(0)?;
489                self.blob_used(txn_tracker, blob_id).await?;
490            }
491            ProcessNewEpoch(epoch) => {
492                self.check_next_epoch(epoch)?;
493                let admin_chain_id = self
494                    .admin_chain_id
495                    .get()
496                    .ok_or_else(|| ExecutionError::InactiveChain(context.chain_id))?;
497                let event_id = EventId {
498                    chain_id: admin_chain_id,
499                    stream_id: StreamId::system(EPOCH_STREAM_NAME),
500                    index: epoch.0,
501                };
502                let bytes = txn_tracker
503                    .oracle(|| async {
504                        let bytes = self.get_event(event_id.clone()).await?;
505                        Ok(OracleResponse::Event(
506                            event_id.clone(),
507                            Arc::unwrap_or_clone(bytes),
508                        ))
509                    })
510                    .await?
511                    .to_event(&event_id)?;
512                let blob_id = BlobId::new(bcs::from_bytes(&bytes)?, BlobType::Committee);
513                let committee = bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
514                self.blob_used(txn_tracker, blob_id).await?;
515                self.committees.get_mut().insert(epoch, committee);
516                self.epoch.set(epoch);
517            }
518            ProcessRemovedEpoch(epoch) => {
519                ensure!(
520                    self.committees.get_mut().remove(&epoch).is_some(),
521                    ExecutionError::InvalidCommitteeRemoval
522                );
523                let admin_chain_id = self
524                    .admin_chain_id
525                    .get()
526                    .ok_or_else(|| ExecutionError::InactiveChain(context.chain_id))?;
527                let event_id = EventId {
528                    chain_id: admin_chain_id,
529                    stream_id: StreamId::system(REMOVED_EPOCH_STREAM_NAME),
530                    index: epoch.0,
531                };
532                txn_tracker
533                    .oracle(|| async {
534                        let bytes = self.get_event(event_id.clone()).await?;
535                        Ok(OracleResponse::Event(event_id, Arc::unwrap_or_clone(bytes)))
536                    })
537                    .await?;
538            }
539            UpdateStreams(streams) => {
540                let mut missing_events = Vec::new();
541                for (chain_id, stream_id, next_index) in streams {
542                    let subscriptions = self
543                        .event_subscriptions
544                        .get_mut_or_default(&(chain_id, stream_id.clone()))
545                        .await?;
546                    ensure!(
547                        subscriptions.next_index < next_index,
548                        ExecutionError::OutdatedUpdateStreams
549                    );
550                    for application_id in &subscriptions.applications {
551                        txn_tracker.add_stream_to_process(
552                            *application_id,
553                            chain_id,
554                            stream_id.clone(),
555                            subscriptions.next_index,
556                            next_index,
557                        );
558                    }
559                    subscriptions.next_index = next_index;
560                    let index = next_index
561                        .checked_sub(1)
562                        .ok_or(ArithmeticError::Underflow)?;
563                    let event_id = EventId {
564                        chain_id,
565                        stream_id,
566                        index,
567                    };
568                    let extra = self.context().extra();
569                    txn_tracker
570                        .oracle(|| async {
571                            if !extra.contains_event(event_id.clone()).await? {
572                                missing_events.push(event_id.clone());
573                            }
574                            Ok(OracleResponse::EventExists(event_id))
575                        })
576                        .await?;
577                }
578                ensure!(
579                    missing_events.is_empty(),
580                    ExecutionError::EventsNotFound(missing_events)
581                );
582            }
583        }
584
585        Ok(new_application)
586    }
587
588    /// Returns an error if the `provided` epoch is not exactly one higher than the chain's current
589    /// epoch.
590    fn check_next_epoch(&self, provided: Epoch) -> Result<(), ExecutionError> {
591        let expected = self.epoch.get().try_add_one()?;
592        ensure!(
593            provided == expected,
594            ExecutionError::InvalidCommitteeEpoch { provided, expected }
595        );
596        Ok(())
597    }
598
599    async fn credit(&mut self, owner: &AccountOwner, amount: Amount) -> Result<(), ExecutionError> {
600        if owner == &AccountOwner::CHAIN {
601            let new_balance = self.balance.get().saturating_add(amount);
602            self.balance.set(new_balance);
603        } else {
604            let balance = self.balances.get_mut_or_default(owner).await?;
605            *balance = balance.saturating_add(amount);
606        }
607        Ok(())
608    }
609
610    async fn credit_or_send_message(
611        &mut self,
612        source: AccountOwner,
613        recipient: Account,
614        amount: Amount,
615    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
616        let source_chain_id = self.context().extra().chain_id();
617        if recipient.chain_id == source_chain_id {
618            // Handle same-chain transfer locally.
619            let target = recipient.owner;
620            self.credit(&target, amount).await?;
621            Ok(None)
622        } else {
623            // Handle cross-chain transfer with message.
624            let message = SystemMessage::Credit {
625                amount,
626                source,
627                target: recipient.owner,
628            };
629            Ok(Some(
630                OutgoingMessage::new(recipient.chain_id, message).with_kind(MessageKind::Tracked),
631            ))
632        }
633    }
634
635    pub async fn transfer(
636        &mut self,
637        authenticated_signer: Option<AccountOwner>,
638        authenticated_application_id: Option<ApplicationId>,
639        source: AccountOwner,
640        recipient: Account,
641        amount: Amount,
642    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
643        if source == AccountOwner::CHAIN {
644            ensure!(
645                authenticated_signer.is_some()
646                    && self
647                        .ownership
648                        .get()
649                        .await?
650                        .verify_owner(&authenticated_signer.unwrap()),
651                ExecutionError::UnauthenticatedTransferOwner
652            );
653        } else {
654            ensure!(
655                authenticated_signer == Some(source)
656                    || authenticated_application_id.map(AccountOwner::from) == Some(source),
657                ExecutionError::UnauthenticatedTransferOwner
658            );
659        }
660        ensure!(
661            amount > Amount::ZERO,
662            ExecutionError::IncorrectTransferAmount
663        );
664        self.debit(&source, amount).await?;
665        self.credit_or_send_message(source, recipient, amount).await
666    }
667
668    pub async fn claim(
669        &mut self,
670        authenticated_signer: Option<AccountOwner>,
671        authenticated_application_id: Option<ApplicationId>,
672        source: AccountOwner,
673        target_id: ChainId,
674        recipient: Account,
675        amount: Amount,
676    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
677        ensure!(
678            authenticated_signer == Some(source)
679                || authenticated_application_id.map(AccountOwner::from) == Some(source),
680            ExecutionError::UnauthenticatedClaimOwner
681        );
682        ensure!(amount > Amount::ZERO, ExecutionError::IncorrectClaimAmount);
683
684        let current_chain_id = self.context().extra().chain_id();
685        if target_id == current_chain_id {
686            // Handle same-chain claim locally by processing the withdraw operation directly
687            self.debit(&source, amount).await?;
688            self.credit_or_send_message(source, recipient, amount).await
689        } else {
690            // Handle cross-chain claim with Withdraw message
691            let message = SystemMessage::Withdraw {
692                amount,
693                owner: source,
694                recipient,
695            };
696            Ok(Some(
697                OutgoingMessage::new(target_id, message)
698                    .with_authenticated_signer(authenticated_signer),
699            ))
700        }
701    }
702
703    /// Debits an [`Amount`] of tokens from an account's balance.
704    async fn debit(
705        &mut self,
706        account: &AccountOwner,
707        amount: Amount,
708    ) -> Result<(), ExecutionError> {
709        let balance = if account == &AccountOwner::CHAIN {
710            self.balance.get_mut()
711        } else {
712            self.balances.get_mut(account).await?.ok_or_else(|| {
713                ExecutionError::InsufficientBalance {
714                    balance: Amount::ZERO,
715                    account: *account,
716                }
717            })?
718        };
719
720        balance
721            .try_sub_assign(amount)
722            .map_err(|_| ExecutionError::InsufficientBalance {
723                balance: *balance,
724                account: *account,
725            })?;
726
727        if account != &AccountOwner::CHAIN && balance.is_zero() {
728            self.balances.remove(account)?;
729        }
730
731        Ok(())
732    }
733
734    /// Executes a cross-chain message that represents the recipient's side of an operation.
735    pub async fn execute_message(
736        &mut self,
737        context: MessageContext,
738        message: SystemMessage,
739    ) -> Result<Vec<OutgoingMessage>, ExecutionError> {
740        let mut outcome = Vec::new();
741        use SystemMessage::*;
742        match message {
743            Credit {
744                amount,
745                source,
746                target,
747            } => {
748                let receiver = if context.is_bouncing { source } else { target };
749                self.credit(&receiver, amount).await?;
750            }
751            Withdraw {
752                amount,
753                owner,
754                recipient,
755            } => {
756                self.debit(&owner, amount).await?;
757                if let Some(message) = self
758                    .credit_or_send_message(owner, recipient, amount)
759                    .await?
760                {
761                    outcome.push(message);
762                }
763            }
764        }
765        Ok(outcome)
766    }
767
768    /// Initializes the system application state on a newly opened chain.
769    /// Returns `Ok(true)` if the chain was already initialized, `Ok(false)` if it wasn't.
770    pub async fn initialize_chain(&mut self, chain_id: ChainId) -> Result<bool, ExecutionError> {
771        if self.description.get().await?.is_some() {
772            // already initialized
773            return Ok(true);
774        }
775        let description_blob = self
776            .read_blob_content(BlobId::new(chain_id.0, BlobType::ChainDescription))
777            .await?;
778        let description: ChainDescription = bcs::from_bytes(description_blob.bytes())?;
779        let InitialChainConfig {
780            ownership,
781            epoch,
782            balance,
783            min_active_epoch,
784            max_active_epoch,
785            application_permissions,
786        } = description.config().clone();
787        self.timestamp.set(description.timestamp());
788        self.description.set(Some(description));
789        self.epoch.set(epoch);
790        let committees = self
791            .context()
792            .extra()
793            .get_committees(min_active_epoch..=max_active_epoch)
794            .await?;
795        let admin_chain_id = self
796            .context()
797            .extra()
798            .get_network_description()
799            .await?
800            .ok_or(ExecutionError::NoNetworkDescriptionFound)?
801            .admin_chain_id;
802
803        self.committees.set(committees);
804        self.admin_chain_id.set(Some(admin_chain_id));
805        self.ownership.set(ownership);
806        self.balance.set(balance);
807        self.application_permissions.set(application_permissions);
808        Ok(false)
809    }
810
811    pub fn handle_query(
812        &mut self,
813        context: QueryContext,
814        _query: SystemQuery,
815    ) -> Result<QueryOutcome<SystemResponse>, ExecutionError> {
816        let response = SystemResponse {
817            chain_id: context.chain_id,
818            balance: *self.balance.get(),
819        };
820        Ok(QueryOutcome {
821            response,
822            operations: vec![],
823        })
824    }
825
826    /// Returns the messages to open a new chain, and subtracts the new chain's balance
827    /// from this chain's.
828    pub async fn open_chain(
829        &mut self,
830        config: OpenChainConfig,
831        parent: ChainId,
832        block_height: BlockHeight,
833        timestamp: Timestamp,
834        txn_tracker: &mut TransactionTracker,
835    ) -> Result<ChainId, ExecutionError> {
836        let chain_index = txn_tracker.next_chain_index();
837        let chain_origin = ChainOrigin::Child {
838            parent,
839            block_height,
840            chain_index,
841        };
842        let committees = self.committees.get();
843        let init_chain_config = config.init_chain_config(
844            *self.epoch.get(),
845            committees.keys().min().copied().unwrap_or(Epoch::ZERO),
846            committees.keys().max().copied().unwrap_or(Epoch::ZERO),
847        );
848        let chain_description = ChainDescription::new(chain_origin, init_chain_config, timestamp);
849        let child_id = chain_description.id();
850        self.debit(&AccountOwner::CHAIN, config.balance).await?;
851        let blob = Blob::new_chain_description(&chain_description);
852        txn_tracker.add_created_blob(blob);
853        Ok(child_id)
854    }
855
856    pub fn close_chain(&mut self) -> Result<(), ExecutionError> {
857        self.closed.set(true);
858        Ok(())
859    }
860
861    pub async fn create_application(
862        &mut self,
863        chain_id: ChainId,
864        block_height: BlockHeight,
865        module_id: ModuleId,
866        parameters: Vec<u8>,
867        required_application_ids: Vec<ApplicationId>,
868        txn_tracker: &mut TransactionTracker,
869    ) -> Result<CreateApplicationResult, ExecutionError> {
870        let application_index = txn_tracker.next_application_index();
871
872        let blob_ids = self.check_bytecode_blobs(&module_id, txn_tracker).await?;
873        // We only remember to register the blobs that aren't recorded in `used_blobs`
874        // already.
875        for blob_id in blob_ids {
876            self.blob_used(txn_tracker, blob_id).await?;
877        }
878
879        let application_description = ApplicationDescription {
880            module_id,
881            creator_chain_id: chain_id,
882            block_height,
883            application_index,
884            parameters,
885            required_application_ids,
886        };
887        self.check_required_applications(&application_description, txn_tracker)
888            .await?;
889
890        let blob = Blob::new_application_description(&application_description);
891        self.used_blobs.insert(&blob.id())?;
892        txn_tracker.add_created_blob(blob);
893
894        Ok(CreateApplicationResult {
895            app_id: ApplicationId::from(&application_description),
896        })
897    }
898
899    async fn check_required_applications(
900        &mut self,
901        application_description: &ApplicationDescription,
902        txn_tracker: &mut TransactionTracker,
903    ) -> Result<(), ExecutionError> {
904        // Make sure that referenced applications IDs have been registered.
905        for required_id in &application_description.required_application_ids {
906            Box::pin(self.describe_application(*required_id, txn_tracker)).await?;
907        }
908        Ok(())
909    }
910
911    /// Retrieves an application's description.
912    pub async fn describe_application(
913        &mut self,
914        id: ApplicationId,
915        txn_tracker: &mut TransactionTracker,
916    ) -> Result<ApplicationDescription, ExecutionError> {
917        let blob_id = id.description_blob_id();
918        let content = match txn_tracker.created_blobs().get(&blob_id) {
919            Some(content) => content.clone(),
920            None => self.read_blob_content(blob_id).await?,
921        };
922        self.blob_used(txn_tracker, blob_id).await?;
923        let description: ApplicationDescription = bcs::from_bytes(content.bytes())?;
924
925        let blob_ids = self
926            .check_bytecode_blobs(&description.module_id, txn_tracker)
927            .await?;
928        // We only remember to register the blobs that aren't recorded in `used_blobs`
929        // already.
930        for blob_id in blob_ids {
931            self.blob_used(txn_tracker, blob_id).await?;
932        }
933
934        self.check_required_applications(&description, txn_tracker)
935            .await?;
936
937        Ok(description)
938    }
939
940    /// Retrieves the recursive dependencies of applications and applies a topological sort.
941    pub async fn find_dependencies(
942        &mut self,
943        mut stack: Vec<ApplicationId>,
944        txn_tracker: &mut TransactionTracker,
945    ) -> Result<Vec<ApplicationId>, ExecutionError> {
946        // What we return at the end.
947        let mut result = Vec::new();
948        // The entries already inserted in `result`.
949        let mut sorted = HashSet::new();
950        // The entries for which dependencies have already been pushed once to the stack.
951        let mut seen = HashSet::new();
952
953        while let Some(id) = stack.pop() {
954            if sorted.contains(&id) {
955                continue;
956            }
957            if seen.contains(&id) {
958                // Second time we see this entry. It was last pushed just before its
959                // dependencies -- which are now fully sorted.
960                sorted.insert(id);
961                result.push(id);
962                continue;
963            }
964            // First time we see this entry:
965            // 1. Mark it so that its dependencies are no longer pushed to the stack.
966            seen.insert(id);
967            // 2. Schedule all the (yet unseen) dependencies, then this entry for a second visit.
968            stack.push(id);
969            let app = self.describe_application(id, txn_tracker).await?;
970            for child in app.required_application_ids.iter().rev() {
971                if !seen.contains(child) {
972                    stack.push(*child);
973                }
974            }
975        }
976        Ok(result)
977    }
978
979    /// Records a blob that is used in this block. If this is the first use on this chain, creates
980    /// an oracle response for it.
981    pub(crate) async fn blob_used(
982        &mut self,
983        txn_tracker: &mut TransactionTracker,
984        blob_id: BlobId,
985    ) -> Result<bool, ExecutionError> {
986        if self.used_blobs.contains(&blob_id).await? {
987            return Ok(false); // Nothing to do.
988        }
989        self.used_blobs.insert(&blob_id)?;
990        txn_tracker.replay_oracle_response(OracleResponse::Blob(blob_id))?;
991        Ok(true)
992    }
993
994    /// Records a blob that is published in this block. This does not create an oracle entry, and
995    /// the blob can be used without using an oracle in the future on this chain.
996    fn blob_published(
997        &mut self,
998        blob_id: &BlobId,
999        txn_tracker: &mut TransactionTracker,
1000    ) -> Result<(), ExecutionError> {
1001        self.used_blobs.insert(blob_id)?;
1002        txn_tracker.add_published_blob(*blob_id);
1003        Ok(())
1004    }
1005
1006    pub async fn read_blob_content(&self, blob_id: BlobId) -> Result<BlobContent, ExecutionError> {
1007        match self.context().extra().get_blob(blob_id).await {
1008            Ok(Some(blob)) => Ok(Arc::unwrap_or_clone(blob).into()),
1009            Ok(None) => Err(ExecutionError::BlobsNotFound(vec![blob_id])),
1010            Err(error) => Err(error.into()),
1011        }
1012    }
1013
1014    pub async fn assert_blob_exists(&mut self, blob_id: BlobId) -> Result<(), ExecutionError> {
1015        if self.context().extra().contains_blob(blob_id).await? {
1016            Ok(())
1017        } else {
1018            Err(ExecutionError::BlobsNotFound(vec![blob_id]))
1019        }
1020    }
1021
1022    async fn check_bytecode_blobs(
1023        &mut self,
1024        module_id: &ModuleId,
1025        txn_tracker: &TransactionTracker,
1026    ) -> Result<Vec<BlobId>, ExecutionError> {
1027        let blob_ids = module_id.bytecode_blob_ids();
1028
1029        let mut missing_blobs = Vec::new();
1030        for blob_id in &blob_ids {
1031            // First check if blob is present in created_blobs
1032            if txn_tracker.created_blobs().contains_key(blob_id) {
1033                continue; // Blob found in created_blobs, it's ok
1034            }
1035            // If not in created_blobs, check storage
1036            if !self.context().extra().contains_blob(*blob_id).await? {
1037                missing_blobs.push(*blob_id);
1038            }
1039        }
1040        ensure!(
1041            missing_blobs.is_empty(),
1042            ExecutionError::BlobsNotFound(missing_blobs)
1043        );
1044
1045        Ok(blob_ids)
1046    }
1047}