Skip to main content

linera_execution/
system.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(test)]
6#[path = "./unit_tests/system_tests.rs"]
7mod tests;
8
9use std::{
10    collections::{BTreeMap, BTreeSet, HashSet},
11    sync::Arc,
12};
13
14use allocative::Allocative;
15use custom_debug_derive::Debug;
16use linera_base::{
17    crypto::CryptoHash,
18    data_types::{
19        Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
20        ChainDescription, ChainOrigin, Epoch, InitialChainConfig, OracleResponse, Timestamp,
21    },
22    ensure, hex_debug,
23    identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, ModuleId, StreamId},
24    ownership::{ChainOwnership, TimeoutConfig},
25};
26use linera_views::{
27    context::Context,
28    lazy_register_view::HashedLazyRegisterView,
29    map_view::HashedMapView,
30    register_view::HashedRegisterView,
31    set_view::HashedSetView,
32    views::{ClonableView, HashableView, ReplaceContext, View},
33    ViewError,
34};
35use serde::{Deserialize, Serialize};
36
37#[cfg(test)]
38use crate::test_utils::SystemExecutionState;
39use crate::{
40    committee::Committee, util::OracleResponseExt as _, ApplicationDescription, ApplicationId,
41    ExecutionError, ExecutionRuntimeContext, MessageContext, MessageKind, OperationContext,
42    OutgoingMessage, QueryContext, QueryOutcome, ResourceController, TransactionTracker,
43};
44
45/// The event stream name for new epochs and committees.
46pub static EPOCH_STREAM_NAME: &[u8] = &[0];
47/// The event stream name for removed epochs.
48pub static REMOVED_EPOCH_STREAM_NAME: &[u8] = &[1];
49
50/// The number of times the [`SystemOperation::OpenChain`] was executed.
51#[cfg(with_metrics)]
52mod metrics {
53    use std::sync::LazyLock;
54
55    use linera_base::prometheus_util::register_int_counter_vec;
56    use prometheus::IntCounterVec;
57
58    pub static OPEN_CHAIN_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
59        register_int_counter_vec(
60            "open_chain_count",
61            "The number of times the `OpenChain` operation was executed",
62            &[],
63        )
64    });
65}
66
67/// A view accessing the execution state of the system of a chain.
68#[derive(Debug, ClonableView, HashableView, Allocative)]
69#[allocative(bound = "C")]
70pub struct SystemExecutionStateView<C> {
71    /// How the chain was created. May be unknown for inactive chains.
72    pub description: HashedLazyRegisterView<C, Option<ChainDescription>>,
73    /// The number identifying the current configuration.
74    pub epoch: HashedRegisterView<C, Epoch>,
75    /// The admin of the chain.
76    pub admin_chain_id: HashedRegisterView<C, Option<ChainId>>,
77    /// The committees that we trust, indexed by epoch number.
78    // Not using a `MapView` because the set active of committees is supposed to be
79    // small. Plus, currently, we would create the `BTreeMap` anyway in various places
80    // (e.g. the `OpenChain` operation).
81    //
82    // Lazy: committee lookups are served from the process-global `SharedCommittees` cache
83    // (via `Storage::get_or_load_committee`), with a fall-back to this view for the
84    // mid-block case in `current_committee`. This view is only loaded when executing an
85    // operation that mutates the map (admin-chain `CreateCommittee`/`RemoveCommittee`, or
86    // `ProcessNewEpoch`/`ProcessRemovedEpoch`).
87    pub committees: HashedLazyRegisterView<C, BTreeMap<Epoch, Committee>>,
88    /// Ownership of the chain.
89    pub ownership: HashedLazyRegisterView<C, ChainOwnership>,
90    /// Balance of the chain. (Available to any user able to create blocks in the chain.)
91    pub balance: HashedRegisterView<C, Amount>,
92    /// Balances attributed to a given owner.
93    pub balances: HashedMapView<C, AccountOwner, Amount>,
94    /// The timestamp of the most recent block.
95    pub timestamp: HashedRegisterView<C, Timestamp>,
96    /// Whether this chain has been closed.
97    pub closed: HashedRegisterView<C, bool>,
98    /// Permissions for applications on this chain.
99    pub application_permissions: HashedLazyRegisterView<C, ApplicationPermissions>,
100    /// Blobs that have been used or published on this chain.
101    pub used_blobs: HashedSetView<C, BlobId>,
102    /// The event stream subscriptions of applications on this chain.
103    pub event_subscriptions: HashedMapView<C, (ChainId, StreamId), EventSubscriptions>,
104}
105
106impl<C: Context, C2: Context> ReplaceContext<C2> for SystemExecutionStateView<C> {
107    type Target = SystemExecutionStateView<C2>;
108
109    async fn with_context(
110        &mut self,
111        ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
112    ) -> Self::Target {
113        SystemExecutionStateView {
114            description: self.description.with_context(ctx.clone()).await,
115            epoch: self.epoch.with_context(ctx.clone()).await,
116            admin_chain_id: self.admin_chain_id.with_context(ctx.clone()).await,
117            committees: self.committees.with_context(ctx.clone()).await,
118            ownership: self.ownership.with_context(ctx.clone()).await,
119            balance: self.balance.with_context(ctx.clone()).await,
120            balances: self.balances.with_context(ctx.clone()).await,
121            timestamp: self.timestamp.with_context(ctx.clone()).await,
122            closed: self.closed.with_context(ctx.clone()).await,
123            application_permissions: self.application_permissions.with_context(ctx.clone()).await,
124            used_blobs: self.used_blobs.with_context(ctx.clone()).await,
125            event_subscriptions: self.event_subscriptions.with_context(ctx.clone()).await,
126        }
127    }
128}
129
130/// The applications subscribing to a particular stream, and the next event index.
131#[derive(Debug, Default, Clone, Serialize, Deserialize, Allocative)]
132pub struct EventSubscriptions {
133    /// The next event index, i.e. the total number of events in this stream that have already
134    /// been processed by this chain.
135    pub next_index: u32,
136    /// The applications that are subscribed to this stream.
137    pub applications: BTreeSet<ApplicationId>,
138}
139
140/// The initial configuration for a new chain.
141#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
142pub struct OpenChainConfig {
143    /// The ownership configuration of the new chain.
144    pub ownership: ChainOwnership,
145    /// The initial chain balance.
146    pub balance: Amount,
147    /// The initial application permissions.
148    pub application_permissions: ApplicationPermissions,
149}
150
151impl OpenChainConfig {
152    /// Creates an [`InitialChainConfig`] based on this [`OpenChainConfig`] and additional
153    /// parameters.
154    pub fn init_chain_config(
155        &self,
156        epoch: Epoch,
157        min_active_epoch: Epoch,
158        max_active_epoch: Epoch,
159    ) -> InitialChainConfig {
160        InitialChainConfig {
161            application_permissions: self.application_permissions.clone(),
162            balance: self.balance,
163            epoch,
164            min_active_epoch,
165            max_active_epoch,
166            ownership: self.ownership.clone(),
167        }
168    }
169}
170
171/// A system operation.
172#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
173pub enum SystemOperation {
174    /// Transfers `amount` units of value from the given owner's account to the recipient.
175    /// If no owner is given, try to take the units out of the unattributed account.
176    Transfer {
177        owner: AccountOwner,
178        recipient: Account,
179        amount: Amount,
180    },
181    /// Claims `amount` units of value from the given owner's account in the remote
182    /// `target` chain. Depending on its configuration, the `target` chain may refuse to
183    /// process the message.
184    Claim {
185        owner: AccountOwner,
186        target_id: ChainId,
187        recipient: Account,
188        amount: Amount,
189    },
190    /// Creates (or activates) a new chain.
191    /// This will automatically subscribe to the future committees created by `admin_chain_id`.
192    OpenChain(OpenChainConfig),
193    /// Closes the chain.
194    CloseChain,
195    /// Changes the ownership of the chain.
196    ChangeOwnership {
197        /// Super owners can propose fast blocks in the first round, and regular blocks in any round.
198        #[debug(skip_if = Vec::is_empty)]
199        super_owners: Vec<AccountOwner>,
200        /// The regular owners, with their weights that determine how often they are round leader.
201        #[debug(skip_if = Vec::is_empty)]
202        owners: Vec<(AccountOwner, u64)>,
203        /// The number of initial rounds after 0 in which all owners are allowed to propose blocks.
204        multi_leader_rounds: u32,
205        /// Whether the multi-leader rounds are unrestricted, i.e. not limited to chain owners.
206        /// This should only be `true` on chains with restrictive application permissions and an
207        /// application-based mechanism to select block proposers.
208        open_multi_leader_rounds: bool,
209        /// The timeout configuration: how long fast, multi-leader and single-leader rounds last.
210        timeout_config: TimeoutConfig,
211    },
212    /// Changes the application permissions configuration on this chain.
213    ChangeApplicationPermissions(ApplicationPermissions),
214    /// Publishes a new application module.
215    PublishModule { module_id: ModuleId },
216    /// Publishes a new data blob.
217    PublishDataBlob { blob_hash: CryptoHash },
218    /// Verifies that the given blob exists. Otherwise the block fails.
219    VerifyBlob { blob_id: BlobId },
220    /// Creates a new application.
221    CreateApplication {
222        module_id: ModuleId,
223        #[serde(with = "serde_bytes")]
224        #[debug(with = "hex_debug")]
225        parameters: Vec<u8>,
226        #[serde(with = "serde_bytes")]
227        #[debug(with = "hex_debug", skip_if = Vec::is_empty)]
228        instantiation_argument: Vec<u8>,
229        #[debug(skip_if = Vec::is_empty)]
230        required_application_ids: Vec<ApplicationId>,
231    },
232    /// Operations that are only allowed on the admin chain.
233    Admin(AdminOperation),
234    /// Processes an event about a new epoch and committee.
235    ProcessNewEpoch(Epoch),
236    /// Processes an event about a removed epoch and committee.
237    ProcessRemovedEpoch(Epoch),
238    /// Updates the event stream trackers.
239    UpdateStreams(Vec<(ChainId, StreamId, u32)>),
240}
241
242/// Operations that are only allowed on the admin chain.
243#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
244pub enum AdminOperation {
245    /// Publishes a new committee as a blob. This can be assigned to an epoch using
246    /// [`AdminOperation::CreateCommittee`] in a later block.
247    PublishCommitteeBlob { blob_hash: CryptoHash },
248    /// Registers a new committee. Other chains can then migrate to the new epoch by executing
249    /// [`SystemOperation::ProcessNewEpoch`].
250    CreateCommittee { epoch: Epoch, blob_hash: CryptoHash },
251    /// Removes a committee. Other chains should execute [`SystemOperation::ProcessRemovedEpoch`],
252    /// so that blocks from the retired epoch will not be accepted until they are followed (hence
253    /// re-certified) by a block certified by a recent committee.
254    RemoveCommittee { epoch: Epoch },
255}
256
257/// A system message meant to be executed on a remote chain.
258#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
259pub enum SystemMessage {
260    /// Credits `amount` units of value to the account `target` -- unless the message is
261    /// bouncing, in which case `source` is credited instead.
262    Credit {
263        target: AccountOwner,
264        amount: Amount,
265        source: AccountOwner,
266    },
267    /// Withdraws `amount` units of value from the account and starts a transfer to credit
268    /// the recipient. The message must be properly authenticated. Receiver chains may
269    /// refuse it depending on their configuration.
270    Withdraw {
271        owner: AccountOwner,
272        amount: Amount,
273        recipient: Account,
274    },
275}
276
277/// A query to the system state.
278#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
279pub struct SystemQuery;
280
281/// The response to a system query.
282#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
283pub struct SystemResponse {
284    pub chain_id: ChainId,
285    pub balance: Amount,
286}
287
288/// Optional user message attached to a transfer.
289#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Hash, Default, Debug, Serialize, Deserialize)]
290pub struct UserData(pub Option<[u8; 32]>);
291
292impl UserData {
293    pub fn from_option_string(opt_str: Option<String>) -> Result<Self, usize> {
294        // Convert the Option<String> to Option<[u8; 32]>
295        let option_array = match opt_str {
296            Some(s) => {
297                // Convert the String to a Vec<u8>
298                let vec = s.into_bytes();
299                if vec.len() <= 32 {
300                    // Create an array from the Vec<u8>
301                    let mut array = [b' '; 32];
302
303                    // Copy bytes from the vector into the array
304                    let len = vec.len().min(32);
305                    array[..len].copy_from_slice(&vec[..len]);
306
307                    Some(array)
308                } else {
309                    return Err(vec.len());
310                }
311            }
312            None => None,
313        };
314
315        // Return the UserData with the converted Option<[u8; 32]>
316        Ok(UserData(option_array))
317    }
318}
319
320#[derive(Debug)]
321pub struct CreateApplicationResult {
322    pub app_id: ApplicationId,
323}
324
325impl<C> SystemExecutionStateView<C>
326where
327    C: Context + Clone + 'static,
328    C::Extra: ExecutionRuntimeContext,
329{
330    /// Invariant for the states of active chains.
331    pub async fn is_active(&self) -> Result<bool, ViewError> {
332        Ok(self.description.get().await?.is_some()
333            && self.ownership.get().await?.is_active()
334            && self.current_committee().await?.is_some()
335            && self.admin_chain_id.get().is_some())
336    }
337
338    /// Returns the chain's current epoch together with its committee.
339    ///
340    /// Serves lookups from the process-global [`crate::SharedCommittees`] cache, falling
341    /// back to the `NewCommittee` event on the admin chain (or the genesis committee blob
342    /// for epoch 0). If neither source has the committee — which happens when a
343    /// `CreateCommittee`/`ProcessNewEpoch` op earlier in the current block created it, but
344    /// the block hasn't been saved yet so the `NewCommittee` event isn't persisted (and
345    /// we deliberately don't populate the shared cache from an unconfirmed block) — fall
346    /// back to reading the chain's own `committees` view, where the new committee sits as
347    /// a pending update.
348    pub async fn current_committee(&self) -> Result<Option<(Epoch, Arc<Committee>)>, ViewError> {
349        let epoch = *self.epoch.get();
350        if let Some(committee) = self.context().extra().get_or_load_committee(epoch).await? {
351            return Ok(Some((epoch, committee)));
352        }
353        let Some(committee) = self.committees.get().await?.get(&epoch).cloned() else {
354            return Ok(None);
355        };
356        Ok(Some((epoch, Arc::new(committee))))
357    }
358
359    async fn get_event(&self, event_id: EventId) -> Result<Arc<Vec<u8>>, ExecutionError> {
360        match self.context().extra().get_event(event_id.clone()).await? {
361            None => Err(ExecutionError::EventsNotFound(vec![event_id])),
362            Some(vec) => Ok(vec),
363        }
364    }
365
366    /// Executes the sender's side of an operation and returns a list of actions to be
367    /// taken.
368    pub async fn execute_operation(
369        &mut self,
370        context: OperationContext,
371        operation: SystemOperation,
372        txn_tracker: &mut TransactionTracker,
373        resource_controller: &mut ResourceController<Option<AccountOwner>>,
374    ) -> Result<Option<(ApplicationId, Vec<u8>)>, ExecutionError> {
375        use SystemOperation::*;
376        let mut new_application = None;
377        match operation {
378            OpenChain(config) => {
379                let _chain_id = self
380                    .open_chain(
381                        config,
382                        context.chain_id,
383                        context.height,
384                        context.timestamp,
385                        txn_tracker,
386                    )
387                    .await?;
388                #[cfg(with_metrics)]
389                metrics::OPEN_CHAIN_COUNT.with_label_values(&[]).inc();
390            }
391            ChangeOwnership {
392                super_owners,
393                owners,
394                multi_leader_rounds,
395                open_multi_leader_rounds,
396                timeout_config,
397            } => {
398                self.ownership.set(ChainOwnership {
399                    super_owners: super_owners.into_iter().collect(),
400                    owners: owners.into_iter().collect(),
401                    multi_leader_rounds,
402                    open_multi_leader_rounds,
403                    timeout_config,
404                });
405            }
406            ChangeApplicationPermissions(application_permissions) => {
407                self.application_permissions.set(application_permissions);
408            }
409            CloseChain => self.close_chain()?,
410            Transfer {
411                owner,
412                amount,
413                recipient,
414            } => {
415                let maybe_message = self
416                    .transfer(context.authenticated_signer, None, owner, recipient, amount)
417                    .await?;
418                txn_tracker.add_outgoing_messages(maybe_message);
419            }
420            Claim {
421                owner,
422                target_id,
423                recipient,
424                amount,
425            } => {
426                let maybe_message = self
427                    .claim(
428                        context.authenticated_signer,
429                        None,
430                        owner,
431                        target_id,
432                        recipient,
433                        amount,
434                    )
435                    .await?;
436                txn_tracker.add_outgoing_messages(maybe_message);
437            }
438            Admin(admin_operation) => {
439                ensure!(
440                    *self.admin_chain_id.get() == Some(context.chain_id),
441                    ExecutionError::AdminOperationOnNonAdminChain
442                );
443                match admin_operation {
444                    AdminOperation::PublishCommitteeBlob { blob_hash } => {
445                        self.blob_published(
446                            &BlobId::new(blob_hash, BlobType::Committee),
447                            txn_tracker,
448                        )?;
449                    }
450                    AdminOperation::CreateCommittee { epoch, blob_hash } => {
451                        self.check_next_epoch(epoch)?;
452                        let blob_id = BlobId::new(blob_hash, BlobType::Committee);
453                        let committee =
454                            bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
455                        self.blob_used(txn_tracker, blob_id).await?;
456                        self.committees.get_mut().await?.insert(epoch, committee);
457                        self.epoch.set(epoch);
458                        txn_tracker.add_event(
459                            StreamId::system(EPOCH_STREAM_NAME),
460                            epoch.0,
461                            bcs::to_bytes(&blob_hash)?,
462                        );
463                    }
464                    AdminOperation::RemoveCommittee { epoch } => {
465                        ensure!(
466                            self.committees.get_mut().await?.remove(&epoch).is_some(),
467                            ExecutionError::InvalidCommitteeRemoval
468                        );
469                        txn_tracker.add_event(
470                            StreamId::system(REMOVED_EPOCH_STREAM_NAME),
471                            epoch.0,
472                            vec![],
473                        );
474                    }
475                }
476            }
477            PublishModule { module_id } => {
478                for blob_id in module_id.bytecode_blob_ids() {
479                    self.blob_published(&blob_id, txn_tracker)?;
480                }
481            }
482            CreateApplication {
483                module_id,
484                parameters,
485                instantiation_argument,
486                required_application_ids,
487            } => {
488                let CreateApplicationResult { app_id } = self
489                    .create_application(
490                        context.chain_id,
491                        context.height,
492                        module_id,
493                        parameters,
494                        required_application_ids,
495                        txn_tracker,
496                    )
497                    .await?;
498                new_application = Some((app_id, instantiation_argument));
499            }
500            PublishDataBlob { blob_hash } => {
501                self.blob_published(&BlobId::new(blob_hash, BlobType::Data), txn_tracker)?;
502            }
503            VerifyBlob { blob_id } => {
504                self.assert_blob_exists(blob_id).await?;
505                resource_controller
506                    .with_state(self)
507                    .await?
508                    .track_blob_read(0)?;
509                self.blob_used(txn_tracker, blob_id).await?;
510            }
511            ProcessNewEpoch(epoch) => {
512                self.check_next_epoch(epoch)?;
513                let admin_chain_id = self
514                    .admin_chain_id
515                    .get()
516                    .ok_or_else(|| ExecutionError::InactiveChain(context.chain_id))?;
517                let event_id = EventId {
518                    chain_id: admin_chain_id,
519                    stream_id: StreamId::system(EPOCH_STREAM_NAME),
520                    index: epoch.0,
521                };
522                let bytes = txn_tracker
523                    .oracle(|| async {
524                        let bytes = self.get_event(event_id.clone()).await?;
525                        Ok(OracleResponse::Event(
526                            event_id.clone(),
527                            Arc::unwrap_or_clone(bytes),
528                        ))
529                    })
530                    .await?
531                    .to_event(&event_id)?;
532                let blob_id = BlobId::new(bcs::from_bytes(&bytes)?, BlobType::Committee);
533                let committee = bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
534                self.blob_used(txn_tracker, blob_id).await?;
535                self.committees.get_mut().await?.insert(epoch, committee);
536                self.epoch.set(epoch);
537            }
538            ProcessRemovedEpoch(epoch) => {
539                ensure!(
540                    self.committees.get_mut().await?.remove(&epoch).is_some(),
541                    ExecutionError::InvalidCommitteeRemoval
542                );
543                let admin_chain_id = self
544                    .admin_chain_id
545                    .get()
546                    .ok_or_else(|| ExecutionError::InactiveChain(context.chain_id))?;
547                let event_id = EventId {
548                    chain_id: admin_chain_id,
549                    stream_id: StreamId::system(REMOVED_EPOCH_STREAM_NAME),
550                    index: epoch.0,
551                };
552                txn_tracker
553                    .oracle(|| async {
554                        let bytes = self.get_event(event_id.clone()).await?;
555                        Ok(OracleResponse::Event(event_id, Arc::unwrap_or_clone(bytes)))
556                    })
557                    .await?;
558            }
559            UpdateStreams(streams) => {
560                let mut missing_events = Vec::new();
561                for (chain_id, stream_id, next_index) in streams {
562                    let subscriptions = self
563                        .event_subscriptions
564                        .get_mut_or_default(&(chain_id, stream_id.clone()))
565                        .await?;
566                    ensure!(
567                        subscriptions.next_index < next_index,
568                        ExecutionError::OutdatedUpdateStreams
569                    );
570                    for application_id in &subscriptions.applications {
571                        txn_tracker.add_stream_to_process(
572                            *application_id,
573                            chain_id,
574                            stream_id.clone(),
575                            subscriptions.next_index,
576                            next_index,
577                        );
578                    }
579                    subscriptions.next_index = next_index;
580                    let index = next_index
581                        .checked_sub(1)
582                        .ok_or(ArithmeticError::Underflow)?;
583                    let event_id = EventId {
584                        chain_id,
585                        stream_id,
586                        index,
587                    };
588                    let extra = self.context().extra();
589                    txn_tracker
590                        .oracle(|| async {
591                            if !extra.contains_event(event_id.clone()).await? {
592                                missing_events.push(event_id.clone());
593                            }
594                            Ok(OracleResponse::EventExists(event_id))
595                        })
596                        .await?;
597                }
598                ensure!(
599                    missing_events.is_empty(),
600                    ExecutionError::EventsNotFound(missing_events)
601                );
602            }
603        }
604
605        Ok(new_application)
606    }
607
608    /// Returns an error if the `provided` epoch is not exactly one higher than the chain's current
609    /// epoch.
610    fn check_next_epoch(&self, provided: Epoch) -> Result<(), ExecutionError> {
611        let expected = self.epoch.get().try_add_one()?;
612        ensure!(
613            provided == expected,
614            ExecutionError::InvalidCommitteeEpoch { provided, expected }
615        );
616        Ok(())
617    }
618
619    async fn credit(&mut self, owner: &AccountOwner, amount: Amount) -> Result<(), ExecutionError> {
620        if owner == &AccountOwner::CHAIN {
621            let new_balance = self.balance.get().saturating_add(amount);
622            self.balance.set(new_balance);
623        } else {
624            let balance = self.balances.get_mut_or_default(owner).await?;
625            *balance = balance.saturating_add(amount);
626        }
627        Ok(())
628    }
629
630    async fn credit_or_send_message(
631        &mut self,
632        source: AccountOwner,
633        recipient: Account,
634        amount: Amount,
635    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
636        let source_chain_id = self.context().extra().chain_id();
637        if recipient.chain_id == source_chain_id {
638            // Handle same-chain transfer locally.
639            let target = recipient.owner;
640            self.credit(&target, amount).await?;
641            Ok(None)
642        } else {
643            // Handle cross-chain transfer with message.
644            let message = SystemMessage::Credit {
645                amount,
646                source,
647                target: recipient.owner,
648            };
649            Ok(Some(
650                OutgoingMessage::new(recipient.chain_id, message).with_kind(MessageKind::Tracked),
651            ))
652        }
653    }
654
655    pub async fn transfer(
656        &mut self,
657        authenticated_signer: Option<AccountOwner>,
658        authenticated_application_id: Option<ApplicationId>,
659        source: AccountOwner,
660        recipient: Account,
661        amount: Amount,
662    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
663        if source == AccountOwner::CHAIN {
664            ensure!(
665                authenticated_signer.is_some()
666                    && self
667                        .ownership
668                        .get()
669                        .await?
670                        .verify_owner(&authenticated_signer.unwrap()),
671                ExecutionError::UnauthenticatedTransferOwner
672            );
673        } else {
674            ensure!(
675                authenticated_signer == Some(source)
676                    || authenticated_application_id.map(AccountOwner::from) == Some(source),
677                ExecutionError::UnauthenticatedTransferOwner
678            );
679        }
680        ensure!(
681            amount > Amount::ZERO,
682            ExecutionError::IncorrectTransferAmount
683        );
684        self.debit(&source, amount).await?;
685        self.credit_or_send_message(source, recipient, amount).await
686    }
687
688    pub async fn claim(
689        &mut self,
690        authenticated_signer: Option<AccountOwner>,
691        authenticated_application_id: Option<ApplicationId>,
692        source: AccountOwner,
693        target_id: ChainId,
694        recipient: Account,
695        amount: Amount,
696    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
697        ensure!(
698            authenticated_signer == Some(source)
699                || authenticated_application_id.map(AccountOwner::from) == Some(source),
700            ExecutionError::UnauthenticatedClaimOwner
701        );
702        ensure!(amount > Amount::ZERO, ExecutionError::IncorrectClaimAmount);
703
704        let current_chain_id = self.context().extra().chain_id();
705        if target_id == current_chain_id {
706            // Handle same-chain claim locally by processing the withdraw operation directly
707            self.debit(&source, amount).await?;
708            self.credit_or_send_message(source, recipient, amount).await
709        } else {
710            // Handle cross-chain claim with Withdraw message
711            let message = SystemMessage::Withdraw {
712                amount,
713                owner: source,
714                recipient,
715            };
716            Ok(Some(
717                OutgoingMessage::new(target_id, message)
718                    .with_authenticated_signer(authenticated_signer),
719            ))
720        }
721    }
722
723    /// Debits an [`Amount`] of tokens from an account's balance.
724    async fn debit(
725        &mut self,
726        account: &AccountOwner,
727        amount: Amount,
728    ) -> Result<(), ExecutionError> {
729        let balance = if account == &AccountOwner::CHAIN {
730            self.balance.get_mut()
731        } else {
732            self.balances.get_mut(account).await?.ok_or_else(|| {
733                ExecutionError::InsufficientBalance {
734                    balance: Amount::ZERO,
735                    account: *account,
736                }
737            })?
738        };
739
740        balance
741            .try_sub_assign(amount)
742            .map_err(|_| ExecutionError::InsufficientBalance {
743                balance: *balance,
744                account: *account,
745            })?;
746
747        if account != &AccountOwner::CHAIN && balance.is_zero() {
748            self.balances.remove(account)?;
749        }
750
751        Ok(())
752    }
753
754    /// Executes a cross-chain message that represents the recipient's side of an operation.
755    pub async fn execute_message(
756        &mut self,
757        context: MessageContext,
758        message: SystemMessage,
759    ) -> Result<Vec<OutgoingMessage>, ExecutionError> {
760        let mut outcome = Vec::new();
761        use SystemMessage::*;
762        match message {
763            Credit {
764                amount,
765                source,
766                target,
767            } => {
768                let receiver = if context.is_bouncing { source } else { target };
769                self.credit(&receiver, amount).await?;
770            }
771            Withdraw {
772                amount,
773                owner,
774                recipient,
775            } => {
776                self.debit(&owner, amount).await?;
777                if let Some(message) = self
778                    .credit_or_send_message(owner, recipient, amount)
779                    .await?
780                {
781                    outcome.push(message);
782                }
783            }
784        }
785        Ok(outcome)
786    }
787
788    /// Initializes the system application state on a newly opened chain.
789    /// Returns `Ok(true)` if the chain was already initialized, `Ok(false)` if it wasn't.
790    pub async fn initialize_chain(&mut self, chain_id: ChainId) -> Result<bool, ExecutionError> {
791        if self.description.get().await?.is_some() {
792            // already initialized
793            return Ok(true);
794        }
795        let description_blob = self
796            .read_blob_content(BlobId::new(chain_id.0, BlobType::ChainDescription))
797            .await?;
798        let description: ChainDescription = bcs::from_bytes(description_blob.bytes())?;
799        let InitialChainConfig {
800            ownership,
801            epoch,
802            balance,
803            min_active_epoch,
804            max_active_epoch,
805            application_permissions,
806        } = description.config().clone();
807        self.timestamp.set(description.timestamp());
808        self.description.set(Some(description));
809        self.epoch.set(epoch);
810        let committees = self
811            .context()
812            .extra()
813            .get_committees(min_active_epoch..=max_active_epoch)
814            .await?;
815        let admin_chain_id = self
816            .context()
817            .extra()
818            .get_network_description()
819            .await?
820            .ok_or(ExecutionError::NoNetworkDescriptionFound)?
821            .admin_chain_id;
822
823        self.committees.set(committees);
824        self.admin_chain_id.set(Some(admin_chain_id));
825        self.ownership.set(ownership);
826        self.balance.set(balance);
827        self.application_permissions.set(application_permissions);
828        Ok(false)
829    }
830
831    pub fn handle_query(
832        &mut self,
833        context: QueryContext,
834        _query: SystemQuery,
835    ) -> Result<QueryOutcome<SystemResponse>, ExecutionError> {
836        let response = SystemResponse {
837            chain_id: context.chain_id,
838            balance: *self.balance.get(),
839        };
840        Ok(QueryOutcome {
841            response,
842            operations: vec![],
843        })
844    }
845
846    /// Returns the messages to open a new chain, and subtracts the new chain's balance
847    /// from this chain's.
848    pub async fn open_chain(
849        &mut self,
850        config: OpenChainConfig,
851        parent: ChainId,
852        block_height: BlockHeight,
853        timestamp: Timestamp,
854        txn_tracker: &mut TransactionTracker,
855    ) -> Result<ChainId, ExecutionError> {
856        let chain_index = txn_tracker.next_chain_index();
857        let chain_origin = ChainOrigin::Child {
858            parent,
859            block_height,
860            chain_index,
861        };
862        let committees = self.committees.get().await?;
863        let init_chain_config = config.init_chain_config(
864            *self.epoch.get(),
865            committees.keys().min().copied().unwrap_or(Epoch::ZERO),
866            committees.keys().max().copied().unwrap_or(Epoch::ZERO),
867        );
868        let chain_description = ChainDescription::new(chain_origin, init_chain_config, timestamp);
869        let child_id = chain_description.id();
870        self.debit(&AccountOwner::CHAIN, config.balance).await?;
871        let blob = Blob::new_chain_description(&chain_description);
872        txn_tracker.add_created_blob(blob);
873        Ok(child_id)
874    }
875
876    pub fn close_chain(&mut self) -> Result<(), ExecutionError> {
877        self.closed.set(true);
878        Ok(())
879    }
880
881    pub async fn create_application(
882        &mut self,
883        chain_id: ChainId,
884        block_height: BlockHeight,
885        module_id: ModuleId,
886        parameters: Vec<u8>,
887        required_application_ids: Vec<ApplicationId>,
888        txn_tracker: &mut TransactionTracker,
889    ) -> Result<CreateApplicationResult, ExecutionError> {
890        let application_index = txn_tracker.next_application_index();
891
892        let blob_ids = self.check_bytecode_blobs(&module_id, txn_tracker).await?;
893        // We only remember to register the blobs that aren't recorded in `used_blobs`
894        // already.
895        for blob_id in blob_ids {
896            self.blob_used(txn_tracker, blob_id).await?;
897        }
898
899        let application_description = ApplicationDescription {
900            module_id,
901            creator_chain_id: chain_id,
902            block_height,
903            application_index,
904            parameters,
905            required_application_ids,
906        };
907        self.check_required_applications(&application_description, txn_tracker)
908            .await?;
909
910        let blob = Blob::new_application_description(&application_description);
911        self.used_blobs.insert(&blob.id())?;
912        txn_tracker.add_created_blob(blob);
913
914        Ok(CreateApplicationResult {
915            app_id: ApplicationId::from(&application_description),
916        })
917    }
918
919    async fn check_required_applications(
920        &mut self,
921        application_description: &ApplicationDescription,
922        txn_tracker: &mut TransactionTracker,
923    ) -> Result<(), ExecutionError> {
924        // Make sure that referenced applications IDs have been registered.
925        for required_id in &application_description.required_application_ids {
926            Box::pin(self.describe_application(*required_id, txn_tracker)).await?;
927        }
928        Ok(())
929    }
930
931    /// Retrieves an application's description.
932    pub async fn describe_application(
933        &mut self,
934        id: ApplicationId,
935        txn_tracker: &mut TransactionTracker,
936    ) -> Result<ApplicationDescription, ExecutionError> {
937        let blob_id = id.description_blob_id();
938        let content = match txn_tracker.created_blobs().get(&blob_id) {
939            Some(content) => content.clone(),
940            None => self.read_blob_content(blob_id).await?,
941        };
942        self.blob_used(txn_tracker, blob_id).await?;
943        let description: ApplicationDescription = bcs::from_bytes(content.bytes())?;
944
945        let blob_ids = self
946            .check_bytecode_blobs(&description.module_id, txn_tracker)
947            .await?;
948        // We only remember to register the blobs that aren't recorded in `used_blobs`
949        // already.
950        for blob_id in blob_ids {
951            self.blob_used(txn_tracker, blob_id).await?;
952        }
953
954        self.check_required_applications(&description, txn_tracker)
955            .await?;
956
957        Ok(description)
958    }
959
960    /// Retrieves the recursive dependencies of applications and applies a topological sort.
961    pub async fn find_dependencies(
962        &mut self,
963        mut stack: Vec<ApplicationId>,
964        txn_tracker: &mut TransactionTracker,
965    ) -> Result<Vec<ApplicationId>, ExecutionError> {
966        // What we return at the end.
967        let mut result = Vec::new();
968        // The entries already inserted in `result`.
969        let mut sorted = HashSet::new();
970        // The entries for which dependencies have already been pushed once to the stack.
971        let mut seen = HashSet::new();
972
973        while let Some(id) = stack.pop() {
974            if sorted.contains(&id) {
975                continue;
976            }
977            if seen.contains(&id) {
978                // Second time we see this entry. It was last pushed just before its
979                // dependencies -- which are now fully sorted.
980                sorted.insert(id);
981                result.push(id);
982                continue;
983            }
984            // First time we see this entry:
985            // 1. Mark it so that its dependencies are no longer pushed to the stack.
986            seen.insert(id);
987            // 2. Schedule all the (yet unseen) dependencies, then this entry for a second visit.
988            stack.push(id);
989            let app = self.describe_application(id, txn_tracker).await?;
990            for child in app.required_application_ids.iter().rev() {
991                if !seen.contains(child) {
992                    stack.push(*child);
993                }
994            }
995        }
996        Ok(result)
997    }
998
999    /// Records a blob that is used in this block. If this is the first use on this chain, creates
1000    /// an oracle response for it.
1001    pub(crate) async fn blob_used(
1002        &mut self,
1003        txn_tracker: &mut TransactionTracker,
1004        blob_id: BlobId,
1005    ) -> Result<bool, ExecutionError> {
1006        if self.used_blobs.contains(&blob_id).await? {
1007            return Ok(false); // Nothing to do.
1008        }
1009        self.used_blobs.insert(&blob_id)?;
1010        txn_tracker.replay_oracle_response(OracleResponse::Blob(blob_id))?;
1011        Ok(true)
1012    }
1013
1014    /// Records a blob that is published in this block. This does not create an oracle entry, and
1015    /// the blob can be used without using an oracle in the future on this chain.
1016    fn blob_published(
1017        &mut self,
1018        blob_id: &BlobId,
1019        txn_tracker: &mut TransactionTracker,
1020    ) -> Result<(), ExecutionError> {
1021        self.used_blobs.insert(blob_id)?;
1022        txn_tracker.add_published_blob(*blob_id);
1023        Ok(())
1024    }
1025
1026    pub async fn read_blob_content(&self, blob_id: BlobId) -> Result<BlobContent, ExecutionError> {
1027        match self.context().extra().get_blob(blob_id).await {
1028            Ok(Some(blob)) => Ok(Arc::unwrap_or_clone(blob).into()),
1029            Ok(None) => Err(ExecutionError::BlobsNotFound(vec![blob_id])),
1030            Err(error) => Err(error.into()),
1031        }
1032    }
1033
1034    pub async fn assert_blob_exists(&mut self, blob_id: BlobId) -> Result<(), ExecutionError> {
1035        if self.context().extra().contains_blob(blob_id).await? {
1036            Ok(())
1037        } else {
1038            Err(ExecutionError::BlobsNotFound(vec![blob_id]))
1039        }
1040    }
1041
1042    async fn check_bytecode_blobs(
1043        &mut self,
1044        module_id: &ModuleId,
1045        txn_tracker: &TransactionTracker,
1046    ) -> Result<Vec<BlobId>, ExecutionError> {
1047        let blob_ids = module_id.bytecode_blob_ids();
1048
1049        let mut missing_blobs = Vec::new();
1050        for blob_id in &blob_ids {
1051            // First check if blob is present in created_blobs
1052            if txn_tracker.created_blobs().contains_key(blob_id) {
1053                continue; // Blob found in created_blobs, it's ok
1054            }
1055            // If not in created_blobs, check storage
1056            if !self.context().extra().contains_blob(*blob_id).await? {
1057                missing_blobs.push(*blob_id);
1058            }
1059        }
1060        ensure!(
1061            missing_blobs.is_empty(),
1062            ExecutionError::BlobsNotFound(missing_blobs)
1063        );
1064
1065        Ok(blob_ids)
1066    }
1067}