Skip to main content

linera_execution/
system.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(test)]
6#[path = "./unit_tests/system_tests.rs"]
7mod tests;
8
9use std::{
10    collections::{BTreeMap, BTreeSet},
11    sync::Arc,
12};
13
14use allocative::Allocative;
15use custom_debug_derive::Debug;
16use linera_base::{
17    crypto::CryptoHash,
18    data_types::{
19        Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
20        ChainDescription, ChainOrigin, Epoch, InitialChainConfig, OracleResponse, Timestamp,
21    },
22    ensure, hex_debug,
23    identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, ModuleId, StreamId},
24    ownership::{ChainOwnership, TimeoutConfig},
25};
26use linera_views::{
27    context::Context,
28    lazy_register_view::HashedLazyRegisterView,
29    map_view::HashedMapView,
30    register_view::HashedRegisterView,
31    set_view::HashedSetView,
32    views::{ClonableView, HashableView, ReplaceContext, View},
33    ViewError,
34};
35use serde::{Deserialize, Serialize};
36
37#[cfg(test)]
38use crate::test_utils::SystemExecutionState;
39use crate::{
40    committee::Committee, util::OracleResponseExt as _, ApplicationDescription, ApplicationId,
41    ExecutionError, ExecutionRuntimeContext, MessageContext, MessageKind, OperationContext,
42    OutgoingMessage, QueryContext, QueryOutcome, ResourceController, TransactionTracker,
43};
44
45/// The event stream name for new epochs and committees.
46pub static EPOCH_STREAM_NAME: &[u8] = &[0];
47/// The event stream name for removed epochs.
48pub static REMOVED_EPOCH_STREAM_NAME: &[u8] = &[1];
49
50/// The number of times the [`SystemOperation::OpenChain`] was executed.
51#[cfg(with_metrics)]
52mod metrics {
53    use std::sync::LazyLock;
54
55    use linera_base::prometheus_util::register_int_counter_vec;
56    use prometheus::IntCounterVec;
57
58    pub static OPEN_CHAIN_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
59        register_int_counter_vec(
60            "open_chain_count",
61            "The number of times the `OpenChain` operation was executed",
62            &[],
63        )
64    });
65}
66
67/// A view accessing the execution state of the system of a chain.
68#[derive(Debug, ClonableView, HashableView, Allocative)]
69#[allocative(bound = "C")]
70pub struct SystemExecutionStateView<C> {
71    /// How the chain was created. May be unknown for inactive chains.
72    pub description: HashedLazyRegisterView<C, Option<ChainDescription>>,
73    /// The number identifying the current configuration.
74    pub epoch: HashedRegisterView<C, Epoch>,
75    /// The admin of the chain.
76    pub admin_chain_id: HashedRegisterView<C, Option<ChainId>>,
77    /// The committees that we trust, indexed by epoch number.
78    // Not using a `MapView` because the set active of committees is supposed to be
79    // small. Plus, currently, we would create the `BTreeMap` anyway in various places
80    // (e.g. the `OpenChain` operation).
81    //
82    // Lazy: committee lookups are served from the process-global `SharedCommittees` cache
83    // (via `Storage::get_or_load_committee`), with a fall-back to this view for the
84    // mid-block case in `current_committee`. This view is only loaded when executing an
85    // operation that mutates the map (admin-chain `CreateCommittee`/`RemoveCommittee`, or
86    // `ProcessNewEpoch`/`ProcessRemovedEpoch`).
87    pub committees: HashedLazyRegisterView<C, BTreeMap<Epoch, Committee>>,
88    /// Ownership of the chain.
89    pub ownership: HashedLazyRegisterView<C, ChainOwnership>,
90    /// Balance of the chain. (Available to any user able to create blocks in the chain.)
91    pub balance: HashedRegisterView<C, Amount>,
92    /// Balances attributed to a given owner.
93    pub balances: HashedMapView<C, AccountOwner, Amount>,
94    /// The timestamp of the most recent block.
95    pub timestamp: HashedRegisterView<C, Timestamp>,
96    /// Whether this chain has been closed.
97    pub closed: HashedRegisterView<C, bool>,
98    /// Permissions for applications on this chain.
99    pub application_permissions: HashedLazyRegisterView<C, ApplicationPermissions>,
100    /// Blobs that have been used or published on this chain.
101    pub used_blobs: HashedSetView<C, BlobId>,
102    /// The event stream subscriptions of applications on this chain.
103    pub event_subscriptions: HashedMapView<C, (ChainId, StreamId), EventSubscriptions>,
104}
105
106impl<C: Context, C2: Context> ReplaceContext<C2> for SystemExecutionStateView<C> {
107    type Target = SystemExecutionStateView<C2>;
108
109    async fn with_context(
110        &mut self,
111        ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
112    ) -> Self::Target {
113        SystemExecutionStateView {
114            description: self.description.with_context(ctx.clone()).await,
115            epoch: self.epoch.with_context(ctx.clone()).await,
116            admin_chain_id: self.admin_chain_id.with_context(ctx.clone()).await,
117            committees: self.committees.with_context(ctx.clone()).await,
118            ownership: self.ownership.with_context(ctx.clone()).await,
119            balance: self.balance.with_context(ctx.clone()).await,
120            balances: self.balances.with_context(ctx.clone()).await,
121            timestamp: self.timestamp.with_context(ctx.clone()).await,
122            closed: self.closed.with_context(ctx.clone()).await,
123            application_permissions: self.application_permissions.with_context(ctx.clone()).await,
124            used_blobs: self.used_blobs.with_context(ctx.clone()).await,
125            event_subscriptions: self.event_subscriptions.with_context(ctx.clone()).await,
126        }
127    }
128}
129
130/// The applications subscribing to a particular stream, and the next event index.
131#[derive(Debug, Default, Clone, Serialize, Deserialize, Allocative)]
132pub struct EventSubscriptions {
133    /// The next event index, i.e. the total number of events in this stream that have already
134    /// been processed by this chain.
135    pub next_index: u32,
136    /// The applications that are subscribed to this stream.
137    pub applications: BTreeSet<ApplicationId>,
138}
139
140/// The initial configuration for a new chain.
141#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
142pub struct OpenChainConfig {
143    /// The ownership configuration of the new chain.
144    pub ownership: ChainOwnership,
145    /// The initial chain balance.
146    pub balance: Amount,
147    /// The initial application permissions.
148    pub application_permissions: ApplicationPermissions,
149}
150
151impl OpenChainConfig {
152    /// Creates an [`InitialChainConfig`] based on this [`OpenChainConfig`] and additional
153    /// parameters.
154    pub fn init_chain_config(
155        &self,
156        epoch: Epoch,
157        min_active_epoch: Epoch,
158        max_active_epoch: Epoch,
159    ) -> InitialChainConfig {
160        InitialChainConfig {
161            application_permissions: self.application_permissions.clone(),
162            balance: self.balance,
163            epoch,
164            min_active_epoch,
165            max_active_epoch,
166            ownership: self.ownership.clone(),
167        }
168    }
169}
170
171/// A system operation.
172#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
173pub enum SystemOperation {
174    /// Transfers `amount` units of value from the given owner's account to the recipient.
175    /// If no owner is given, try to take the units out of the unattributed account.
176    Transfer {
177        owner: AccountOwner,
178        recipient: Account,
179        amount: Amount,
180    },
181    /// Claims `amount` units of value from the given owner's account in the remote
182    /// `target` chain. Depending on its configuration, the `target` chain may refuse to
183    /// process the message.
184    Claim {
185        owner: AccountOwner,
186        target_id: ChainId,
187        recipient: Account,
188        amount: Amount,
189    },
190    /// Creates (or activates) a new chain.
191    /// This will automatically subscribe to the future committees created by `admin_chain_id`.
192    OpenChain(OpenChainConfig),
193    /// Closes the chain.
194    CloseChain,
195    /// Changes the ownership of the chain.
196    ChangeOwnership {
197        /// Super owners can propose fast blocks in the first round, and regular blocks in any round.
198        #[debug(skip_if = Vec::is_empty)]
199        super_owners: Vec<AccountOwner>,
200        /// The regular owners, with their weights that determine how often they are round leader.
201        #[debug(skip_if = Vec::is_empty)]
202        owners: Vec<(AccountOwner, u64)>,
203        /// The number of initial rounds after 0 in which all owners are allowed to propose blocks.
204        multi_leader_rounds: u32,
205        /// Whether the multi-leader rounds are unrestricted, i.e. not limited to chain owners.
206        /// This should only be `true` on chains with restrictive application permissions and an
207        /// application-based mechanism to select block proposers.
208        open_multi_leader_rounds: bool,
209        /// The timeout configuration: how long fast, multi-leader and single-leader rounds last.
210        timeout_config: TimeoutConfig,
211    },
212    /// Changes the application permissions configuration on this chain.
213    ChangeApplicationPermissions(ApplicationPermissions),
214    /// Publishes a new application module.
215    PublishModule { module_id: ModuleId },
216    /// Publishes a new data blob.
217    PublishDataBlob { blob_hash: CryptoHash },
218    /// Verifies that the given blob exists. Otherwise the block fails.
219    VerifyBlob { blob_id: BlobId },
220    /// Creates a new application.
221    CreateApplication {
222        module_id: ModuleId,
223        #[serde(with = "serde_bytes")]
224        #[debug(with = "hex_debug")]
225        parameters: Vec<u8>,
226        #[serde(with = "serde_bytes")]
227        #[debug(with = "hex_debug", skip_if = Vec::is_empty)]
228        instantiation_argument: Vec<u8>,
229        #[debug(skip_if = Vec::is_empty)]
230        required_application_ids: Vec<ApplicationId>,
231    },
232    /// Operations that are only allowed on the admin chain.
233    Admin(AdminOperation),
234    /// Processes an event about a new epoch and committee.
235    ProcessNewEpoch(Epoch),
236    /// Processes an event about a removed epoch and committee.
237    ProcessRemovedEpoch(Epoch),
238    /// Updates the event stream trackers.
239    UpdateStreams(Vec<(ChainId, StreamId, u32)>),
240}
241
242/// Operations that are only allowed on the admin chain.
243#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
244pub enum AdminOperation {
245    /// Publishes a new committee as a blob. This can be assigned to an epoch using
246    /// [`AdminOperation::CreateCommittee`] in a later block.
247    PublishCommitteeBlob { blob_hash: CryptoHash },
248    /// Registers a new committee. Other chains can then migrate to the new epoch by executing
249    /// [`SystemOperation::ProcessNewEpoch`].
250    CreateCommittee { epoch: Epoch, blob_hash: CryptoHash },
251    /// Removes a committee. Other chains should execute [`SystemOperation::ProcessRemovedEpoch`],
252    /// so that blocks from the retired epoch will not be accepted until they are followed (hence
253    /// re-certified) by a block certified by a recent committee.
254    RemoveCommittee { epoch: Epoch },
255}
256
257/// A system message meant to be executed on a remote chain.
258#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
259pub enum SystemMessage {
260    /// Credits `amount` units of value to the account `target` -- unless the message is
261    /// bouncing, in which case `source` is credited instead.
262    Credit {
263        target: AccountOwner,
264        amount: Amount,
265        source: AccountOwner,
266    },
267    /// Withdraws `amount` units of value from the account and starts a transfer to credit
268    /// the recipient. The message must be properly authenticated. Receiver chains may
269    /// refuse it depending on their configuration.
270    Withdraw {
271        owner: AccountOwner,
272        amount: Amount,
273        recipient: Account,
274    },
275}
276
277/// A query to the system state.
278#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
279pub struct SystemQuery;
280
281/// The response to a system query.
282#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
283pub struct SystemResponse {
284    pub chain_id: ChainId,
285    pub balance: Amount,
286}
287
288/// Optional user message attached to a transfer.
289#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Hash, Default, Debug, Serialize, Deserialize)]
290pub struct UserData(pub Option<[u8; 32]>);
291
292#[derive(Debug)]
293pub struct CreateApplicationResult {
294    pub app_id: ApplicationId,
295}
296
297impl<C> SystemExecutionStateView<C>
298where
299    C: Context + Clone + 'static,
300    C::Extra: ExecutionRuntimeContext,
301{
302    /// Invariant for the states of active chains.
303    pub async fn is_active(&self) -> Result<bool, ViewError> {
304        Ok(self.description.get().await?.is_some()
305            && self.ownership.get().await?.is_active()
306            && self.current_committee().await?.is_some()
307            && self.admin_chain_id.get().is_some())
308    }
309
310    /// Returns the chain's current epoch together with its committee.
311    ///
312    /// Serves lookups from the process-global [`crate::SharedCommittees`] cache, falling
313    /// back to the `NewCommittee` event on the admin chain (or the genesis committee blob
314    /// for epoch 0). If neither source has the committee — which happens when a
315    /// `CreateCommittee`/`ProcessNewEpoch` op earlier in the current block created it, but
316    /// the block hasn't been saved yet so the `NewCommittee` event isn't persisted (and
317    /// we deliberately don't populate the shared cache from an unconfirmed block) — fall
318    /// back to reading the chain's own `committees` view, where the new committee sits as
319    /// a pending update.
320    pub async fn current_committee(&self) -> Result<Option<(Epoch, Arc<Committee>)>, ViewError> {
321        let epoch = *self.epoch.get();
322        if let Some(committee) = self.context().extra().get_or_load_committee(epoch).await? {
323            return Ok(Some((epoch, committee)));
324        }
325        let Some(committee) = self.committees.get().await?.get(&epoch).cloned() else {
326            return Ok(None);
327        };
328        Ok(Some((epoch, Arc::new(committee))))
329    }
330
331    async fn get_event(&self, event_id: EventId) -> Result<Arc<Vec<u8>>, ExecutionError> {
332        match self.context().extra().get_event(event_id.clone()).await? {
333            None => Err(ExecutionError::EventsNotFound(vec![event_id])),
334            Some(vec) => Ok(vec),
335        }
336    }
337
338    /// Executes the sender's side of an operation and returns a list of actions to be
339    /// taken.
340    pub async fn execute_operation(
341        &mut self,
342        context: OperationContext,
343        operation: SystemOperation,
344        txn_tracker: &mut TransactionTracker,
345        resource_controller: &mut ResourceController<Option<AccountOwner>>,
346    ) -> Result<Option<(ApplicationId, Vec<u8>)>, ExecutionError> {
347        use SystemOperation::*;
348        let mut new_application = None;
349        match operation {
350            OpenChain(config) => {
351                let _chain_id = self
352                    .open_chain(
353                        config,
354                        context.chain_id,
355                        context.height,
356                        context.timestamp,
357                        txn_tracker,
358                    )
359                    .await?;
360                #[cfg(with_metrics)]
361                metrics::OPEN_CHAIN_COUNT.with_label_values(&[]).inc();
362            }
363            ChangeOwnership {
364                super_owners,
365                owners,
366                multi_leader_rounds,
367                open_multi_leader_rounds,
368                timeout_config,
369            } => {
370                self.ownership.set(ChainOwnership {
371                    super_owners: super_owners.into_iter().collect(),
372                    owners: owners.into_iter().collect(),
373                    multi_leader_rounds,
374                    open_multi_leader_rounds,
375                    timeout_config,
376                });
377            }
378            ChangeApplicationPermissions(application_permissions) => {
379                self.application_permissions.set(application_permissions);
380            }
381            CloseChain => self.close_chain()?,
382            Transfer {
383                owner,
384                amount,
385                recipient,
386            } => {
387                let maybe_message = self
388                    .transfer(context.authenticated_signer, None, owner, recipient, amount)
389                    .await?;
390                txn_tracker.add_outgoing_messages(maybe_message);
391            }
392            Claim {
393                owner,
394                target_id,
395                recipient,
396                amount,
397            } => {
398                let maybe_message = self
399                    .claim(
400                        context.authenticated_signer,
401                        None,
402                        owner,
403                        target_id,
404                        recipient,
405                        amount,
406                    )
407                    .await?;
408                txn_tracker.add_outgoing_messages(maybe_message);
409            }
410            Admin(admin_operation) => {
411                ensure!(
412                    *self.admin_chain_id.get() == Some(context.chain_id),
413                    ExecutionError::AdminOperationOnNonAdminChain
414                );
415                match admin_operation {
416                    AdminOperation::PublishCommitteeBlob { blob_hash } => {
417                        self.blob_published(
418                            &BlobId::new(blob_hash, BlobType::Committee),
419                            txn_tracker,
420                        )?;
421                    }
422                    AdminOperation::CreateCommittee { epoch, blob_hash } => {
423                        self.check_next_epoch(epoch)?;
424                        let blob_id = BlobId::new(blob_hash, BlobType::Committee);
425                        let committee =
426                            bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
427                        self.blob_used(txn_tracker, blob_id).await?;
428                        self.committees.get_mut().await?.insert(epoch, committee);
429                        self.epoch.set(epoch);
430                        txn_tracker.add_event(
431                            StreamId::system(EPOCH_STREAM_NAME),
432                            epoch.0,
433                            bcs::to_bytes(&blob_hash)?,
434                        );
435                    }
436                    AdminOperation::RemoveCommittee { epoch } => {
437                        ensure!(
438                            self.committees.get_mut().await?.remove(&epoch).is_some(),
439                            ExecutionError::InvalidCommitteeRemoval
440                        );
441                        txn_tracker.add_event(
442                            StreamId::system(REMOVED_EPOCH_STREAM_NAME),
443                            epoch.0,
444                            vec![],
445                        );
446                    }
447                }
448            }
449            PublishModule { module_id } => {
450                for blob_id in module_id.bytecode_blob_ids() {
451                    self.blob_published(&blob_id, txn_tracker)?;
452                }
453            }
454            CreateApplication {
455                module_id,
456                parameters,
457                instantiation_argument,
458                required_application_ids,
459            } => {
460                let CreateApplicationResult { app_id } = self
461                    .create_application(
462                        context.chain_id,
463                        context.height,
464                        module_id,
465                        parameters,
466                        required_application_ids,
467                        txn_tracker,
468                    )
469                    .await?;
470                new_application = Some((app_id, instantiation_argument));
471            }
472            PublishDataBlob { blob_hash } => {
473                self.blob_published(&BlobId::new(blob_hash, BlobType::Data), txn_tracker)?;
474            }
475            VerifyBlob { blob_id } => {
476                self.assert_blob_exists(blob_id).await?;
477                resource_controller
478                    .with_state(self)
479                    .await?
480                    .track_blob_read(0)?;
481                self.blob_used(txn_tracker, blob_id).await?;
482            }
483            ProcessNewEpoch(epoch) => {
484                self.check_next_epoch(epoch)?;
485                let admin_chain_id = self.admin_chain_id.get().ok_or_else(|| {
486                    ExecutionError::InternalError(
487                        "execute_operation called for uninitialized chain",
488                    )
489                })?;
490                let event_id = EventId {
491                    chain_id: admin_chain_id,
492                    stream_id: StreamId::system(EPOCH_STREAM_NAME),
493                    index: epoch.0,
494                };
495                let bytes = txn_tracker
496                    .oracle(|| async {
497                        let bytes = self.get_event(event_id.clone()).await?;
498                        Ok(OracleResponse::Event(
499                            event_id.clone(),
500                            Arc::unwrap_or_clone(bytes),
501                        ))
502                    })
503                    .await?
504                    .to_event(&event_id)?;
505                let blob_id = BlobId::new(bcs::from_bytes(&bytes)?, BlobType::Committee);
506                let committee = bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
507                self.blob_used(txn_tracker, blob_id).await?;
508                self.committees.get_mut().await?.insert(epoch, committee);
509                self.epoch.set(epoch);
510            }
511            ProcessRemovedEpoch(epoch) => {
512                ensure!(
513                    self.committees.get_mut().await?.remove(&epoch).is_some(),
514                    ExecutionError::InvalidCommitteeRemoval
515                );
516                let admin_chain_id = self.admin_chain_id.get().ok_or_else(|| {
517                    ExecutionError::InternalError(
518                        "execute_operation called for uninitialized chain",
519                    )
520                })?;
521                let event_id = EventId {
522                    chain_id: admin_chain_id,
523                    stream_id: StreamId::system(REMOVED_EPOCH_STREAM_NAME),
524                    index: epoch.0,
525                };
526                txn_tracker
527                    .oracle(|| async {
528                        let bytes = self.get_event(event_id.clone()).await?;
529                        Ok(OracleResponse::Event(event_id, Arc::unwrap_or_clone(bytes)))
530                    })
531                    .await?;
532            }
533            UpdateStreams(streams) => {
534                let mut missing_events = Vec::new();
535                for (chain_id, stream_id, next_index) in streams {
536                    let subscriptions = self
537                        .event_subscriptions
538                        .get_mut_or_default(&(chain_id, stream_id.clone()))
539                        .await?;
540                    ensure!(
541                        subscriptions.next_index < next_index,
542                        ExecutionError::OutdatedUpdateStreams
543                    );
544                    for application_id in &subscriptions.applications {
545                        txn_tracker.add_stream_to_process(
546                            *application_id,
547                            chain_id,
548                            stream_id.clone(),
549                            subscriptions.next_index,
550                            next_index,
551                        );
552                    }
553                    subscriptions.next_index = next_index;
554                    let index = next_index
555                        .checked_sub(1)
556                        .ok_or(ArithmeticError::Underflow)?;
557                    let event_id = EventId {
558                        chain_id,
559                        stream_id,
560                        index,
561                    };
562                    let extra = self.context().extra();
563                    txn_tracker
564                        .oracle(|| async {
565                            if !extra.contains_event(event_id.clone()).await? {
566                                missing_events.push(event_id.clone());
567                            }
568                            Ok(OracleResponse::EventExists(event_id))
569                        })
570                        .await?;
571                }
572                ensure!(
573                    missing_events.is_empty(),
574                    ExecutionError::EventsNotFound(missing_events)
575                );
576            }
577        }
578
579        Ok(new_application)
580    }
581
582    /// Returns an error if the `provided` epoch is not exactly one higher than the chain's current
583    /// epoch.
584    fn check_next_epoch(&self, provided: Epoch) -> Result<(), ExecutionError> {
585        let expected = self.epoch.get().try_add_one()?;
586        ensure!(
587            provided == expected,
588            ExecutionError::InvalidCommitteeEpoch { provided, expected }
589        );
590        Ok(())
591    }
592
593    async fn credit(&mut self, owner: &AccountOwner, amount: Amount) -> Result<(), ExecutionError> {
594        if owner == &AccountOwner::CHAIN {
595            let new_balance = self.balance.get().saturating_add(amount);
596            self.balance.set(new_balance);
597        } else {
598            let balance = self.balances.get_mut_or_default(owner).await?;
599            *balance = balance.saturating_add(amount);
600        }
601        Ok(())
602    }
603
604    async fn credit_or_send_message(
605        &mut self,
606        source: AccountOwner,
607        recipient: Account,
608        amount: Amount,
609    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
610        let source_chain_id = self.context().extra().chain_id();
611        if recipient.chain_id == source_chain_id {
612            // Handle same-chain transfer locally.
613            let target = recipient.owner;
614            self.credit(&target, amount).await?;
615            Ok(None)
616        } else {
617            // Handle cross-chain transfer with message.
618            let message = SystemMessage::Credit {
619                amount,
620                source,
621                target: recipient.owner,
622            };
623            Ok(Some(
624                OutgoingMessage::new(recipient.chain_id, message).with_kind(MessageKind::Tracked),
625            ))
626        }
627    }
628
629    pub async fn transfer(
630        &mut self,
631        authenticated_signer: Option<AccountOwner>,
632        authenticated_application_id: Option<ApplicationId>,
633        source: AccountOwner,
634        recipient: Account,
635        amount: Amount,
636    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
637        if source == AccountOwner::CHAIN {
638            let authenticated_signer =
639                authenticated_signer.ok_or(ExecutionError::UnauthenticatedTransferOwner)?;
640            ensure!(
641                self.ownership
642                    .get()
643                    .await?
644                    .verify_owner(&authenticated_signer),
645                ExecutionError::UnauthenticatedTransferOwner
646            );
647        } else {
648            ensure!(
649                authenticated_signer == Some(source)
650                    || authenticated_application_id.map(AccountOwner::from) == Some(source),
651                ExecutionError::UnauthenticatedTransferOwner
652            );
653        }
654        ensure!(
655            amount > Amount::ZERO,
656            ExecutionError::IncorrectTransferAmount
657        );
658        self.debit(&source, amount).await?;
659        self.credit_or_send_message(source, recipient, amount).await
660    }
661
662    pub async fn claim(
663        &mut self,
664        authenticated_signer: Option<AccountOwner>,
665        authenticated_application_id: Option<ApplicationId>,
666        source: AccountOwner,
667        target_id: ChainId,
668        recipient: Account,
669        amount: Amount,
670    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
671        ensure!(
672            authenticated_signer == Some(source)
673                || authenticated_application_id.map(AccountOwner::from) == Some(source),
674            ExecutionError::UnauthenticatedClaimOwner
675        );
676        ensure!(amount > Amount::ZERO, ExecutionError::IncorrectClaimAmount);
677
678        let current_chain_id = self.context().extra().chain_id();
679        if target_id == current_chain_id {
680            // Handle same-chain claim locally by processing the withdraw operation directly
681            self.debit(&source, amount).await?;
682            self.credit_or_send_message(source, recipient, amount).await
683        } else {
684            // Handle cross-chain claim with Withdraw message
685            let message = SystemMessage::Withdraw {
686                amount,
687                owner: source,
688                recipient,
689            };
690            Ok(Some(
691                OutgoingMessage::new(target_id, message)
692                    .with_authenticated_signer(authenticated_signer),
693            ))
694        }
695    }
696
697    /// Debits an [`Amount`] of tokens from an account's balance.
698    async fn debit(
699        &mut self,
700        account: &AccountOwner,
701        amount: Amount,
702    ) -> Result<(), ExecutionError> {
703        let balance = if account == &AccountOwner::CHAIN {
704            self.balance.get_mut()
705        } else {
706            self.balances.get_mut(account).await?.ok_or_else(|| {
707                ExecutionError::InsufficientBalance {
708                    balance: Amount::ZERO,
709                    account: *account,
710                }
711            })?
712        };
713
714        balance
715            .try_sub_assign(amount)
716            .map_err(|_| ExecutionError::InsufficientBalance {
717                balance: *balance,
718                account: *account,
719            })?;
720
721        if account != &AccountOwner::CHAIN && balance.is_zero() {
722            self.balances.remove(account)?;
723        }
724
725        Ok(())
726    }
727
728    /// Executes a cross-chain message that represents the recipient's side of an operation.
729    pub async fn execute_message(
730        &mut self,
731        context: MessageContext,
732        message: SystemMessage,
733    ) -> Result<Vec<OutgoingMessage>, ExecutionError> {
734        let mut outcome = Vec::new();
735        use SystemMessage::*;
736        match message {
737            Credit {
738                amount,
739                source,
740                target,
741            } => {
742                let receiver = if context.is_bouncing { source } else { target };
743                self.credit(&receiver, amount).await?;
744            }
745            Withdraw {
746                amount,
747                owner,
748                recipient,
749            } => {
750                self.debit(&owner, amount).await?;
751                if let Some(message) = self
752                    .credit_or_send_message(owner, recipient, amount)
753                    .await?
754                {
755                    outcome.push(message);
756                }
757            }
758        }
759        Ok(outcome)
760    }
761
762    /// Initializes the system application state on a newly opened chain.
763    /// Returns `Ok(true)` if the chain was already initialized, `Ok(false)` if it wasn't.
764    pub async fn initialize_chain(&mut self, chain_id: ChainId) -> Result<bool, ExecutionError> {
765        if self.description.get().await?.is_some() {
766            // already initialized
767            return Ok(true);
768        }
769        let description_blob = self
770            .read_blob_content(BlobId::new(chain_id.0, BlobType::ChainDescription))
771            .await?;
772        let description: ChainDescription = bcs::from_bytes(description_blob.bytes())?;
773        let InitialChainConfig {
774            ownership,
775            epoch,
776            balance,
777            min_active_epoch,
778            max_active_epoch,
779            application_permissions,
780        } = description.config().clone();
781        self.timestamp.set(description.timestamp());
782        self.description.set(Some(description));
783        self.epoch.set(epoch);
784        let committees = self
785            .context()
786            .extra()
787            .get_committees(min_active_epoch..=max_active_epoch)
788            .await?;
789        let admin_chain_id = self
790            .context()
791            .extra()
792            .get_network_description()
793            .await?
794            .ok_or(ExecutionError::NoNetworkDescriptionFound)?
795            .admin_chain_id;
796
797        self.committees.set(committees);
798        self.admin_chain_id.set(Some(admin_chain_id));
799        self.ownership.set(ownership);
800        self.balance.set(balance);
801        self.application_permissions.set(application_permissions);
802        Ok(false)
803    }
804
805    pub fn handle_query(
806        &mut self,
807        context: QueryContext,
808        _query: SystemQuery,
809    ) -> Result<QueryOutcome<SystemResponse>, ExecutionError> {
810        let response = SystemResponse {
811            chain_id: context.chain_id,
812            balance: *self.balance.get(),
813        };
814        Ok(QueryOutcome {
815            response,
816            operations: vec![],
817        })
818    }
819
820    /// Returns the messages to open a new chain, and subtracts the new chain's balance
821    /// from this chain's.
822    pub async fn open_chain(
823        &mut self,
824        config: OpenChainConfig,
825        parent: ChainId,
826        block_height: BlockHeight,
827        timestamp: Timestamp,
828        txn_tracker: &mut TransactionTracker,
829    ) -> Result<ChainId, ExecutionError> {
830        let chain_index = txn_tracker.next_chain_index();
831        let chain_origin = ChainOrigin::Child {
832            parent,
833            block_height,
834            chain_index,
835        };
836        let committees = self.committees.get().await?;
837        let init_chain_config = config.init_chain_config(
838            *self.epoch.get(),
839            committees.keys().min().copied().unwrap_or(Epoch::ZERO),
840            committees.keys().max().copied().unwrap_or(Epoch::ZERO),
841        );
842        let chain_description = ChainDescription::new(chain_origin, init_chain_config, timestamp);
843        let child_id = chain_description.id();
844        self.debit(&AccountOwner::CHAIN, config.balance).await?;
845        let blob = Blob::new_chain_description(&chain_description);
846        txn_tracker.add_created_blob(blob);
847        Ok(child_id)
848    }
849
850    pub fn close_chain(&mut self) -> Result<(), ExecutionError> {
851        self.closed.set(true);
852        Ok(())
853    }
854
855    pub async fn create_application(
856        &mut self,
857        chain_id: ChainId,
858        block_height: BlockHeight,
859        module_id: ModuleId,
860        parameters: Vec<u8>,
861        required_application_ids: Vec<ApplicationId>,
862        txn_tracker: &mut TransactionTracker,
863    ) -> Result<CreateApplicationResult, ExecutionError> {
864        let application_index = txn_tracker.next_application_index();
865
866        let blob_ids = self.check_bytecode_blobs(&module_id, txn_tracker).await?;
867        // We only remember to register the blobs that aren't recorded in `used_blobs`
868        // already.
869        for blob_id in blob_ids {
870            self.blob_used(txn_tracker, blob_id).await?;
871        }
872
873        let application_description = ApplicationDescription {
874            module_id,
875            creator_chain_id: chain_id,
876            block_height,
877            application_index,
878            parameters,
879            required_application_ids,
880        };
881        self.check_required_applications(&application_description, txn_tracker)
882            .await?;
883
884        let blob = Blob::new_application_description(&application_description);
885        self.used_blobs.insert(&blob.id())?;
886        txn_tracker.add_created_blob(blob);
887
888        Ok(CreateApplicationResult {
889            app_id: ApplicationId::from(&application_description),
890        })
891    }
892
893    async fn check_required_applications(
894        &mut self,
895        application_description: &ApplicationDescription,
896        txn_tracker: &mut TransactionTracker,
897    ) -> Result<(), ExecutionError> {
898        // Make sure that referenced applications IDs have been registered.
899        for required_id in &application_description.required_application_ids {
900            Box::pin(self.describe_application(*required_id, txn_tracker)).await?;
901        }
902        Ok(())
903    }
904
905    /// Retrieves an application's description.
906    pub async fn describe_application(
907        &mut self,
908        id: ApplicationId,
909        txn_tracker: &mut TransactionTracker,
910    ) -> Result<ApplicationDescription, ExecutionError> {
911        let blob_id = id.description_blob_id();
912        let content = match txn_tracker.created_blobs().get(&blob_id) {
913            Some(content) => content.clone(),
914            None => self.read_blob_content(blob_id).await?,
915        };
916        self.blob_used(txn_tracker, blob_id).await?;
917        let description: ApplicationDescription = bcs::from_bytes(content.bytes())?;
918
919        let blob_ids = self
920            .check_bytecode_blobs(&description.module_id, txn_tracker)
921            .await?;
922        // We only remember to register the blobs that aren't recorded in `used_blobs`
923        // already.
924        for blob_id in blob_ids {
925            self.blob_used(txn_tracker, blob_id).await?;
926        }
927
928        self.check_required_applications(&description, txn_tracker)
929            .await?;
930
931        Ok(description)
932    }
933
934    /// Records a blob that is used in this block. If this is the first use on this chain, creates
935    /// an oracle response for it.
936    pub(crate) async fn blob_used(
937        &mut self,
938        txn_tracker: &mut TransactionTracker,
939        blob_id: BlobId,
940    ) -> Result<bool, ExecutionError> {
941        if self.used_blobs.contains(&blob_id).await? {
942            return Ok(false); // Nothing to do.
943        }
944        self.used_blobs.insert(&blob_id)?;
945        txn_tracker.replay_oracle_response(OracleResponse::Blob(blob_id))?;
946        Ok(true)
947    }
948
949    /// Records a blob that is published in this block. This does not create an oracle entry, and
950    /// the blob can be used without using an oracle in the future on this chain.
951    fn blob_published(
952        &mut self,
953        blob_id: &BlobId,
954        txn_tracker: &mut TransactionTracker,
955    ) -> Result<(), ExecutionError> {
956        self.used_blobs.insert(blob_id)?;
957        txn_tracker.add_published_blob(*blob_id);
958        Ok(())
959    }
960
961    pub async fn read_blob_content(&self, blob_id: BlobId) -> Result<BlobContent, ExecutionError> {
962        match self.context().extra().get_blob(blob_id).await {
963            Ok(Some(blob)) => Ok(Arc::unwrap_or_clone(blob).into()),
964            Ok(None) => Err(ExecutionError::BlobsNotFound(vec![blob_id])),
965            Err(error) => Err(error.into()),
966        }
967    }
968
969    pub async fn assert_blob_exists(&mut self, blob_id: BlobId) -> Result<(), ExecutionError> {
970        if self.context().extra().contains_blob(blob_id).await? {
971            Ok(())
972        } else {
973            Err(ExecutionError::BlobsNotFound(vec![blob_id]))
974        }
975    }
976
977    async fn check_bytecode_blobs(
978        &self,
979        module_id: &ModuleId,
980        txn_tracker: &TransactionTracker,
981    ) -> Result<Vec<BlobId>, ExecutionError> {
982        let blob_ids = module_id.bytecode_blob_ids();
983
984        let mut missing_blobs = Vec::new();
985        for blob_id in &blob_ids {
986            // First check if blob is present in created_blobs
987            if txn_tracker.created_blobs().contains_key(blob_id) {
988                continue; // Blob found in created_blobs, it's ok
989            }
990            // If not in created_blobs, check storage
991            if !self.context().extra().contains_blob(*blob_id).await? {
992                missing_blobs.push(*blob_id);
993            }
994        }
995        ensure!(
996            missing_blobs.is_empty(),
997            ExecutionError::BlobsNotFound(missing_blobs)
998        );
999
1000        Ok(blob_ids)
1001    }
1002}