linera_execution/
system.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(test)]
6#[path = "./unit_tests/system_tests.rs"]
7mod tests;
8
9use std::{
10    collections::{BTreeMap, BTreeSet, HashSet},
11    mem,
12};
13
14use custom_debug_derive::Debug;
15use linera_base::{
16    crypto::CryptoHash,
17    data_types::{
18        Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
19        ChainDescription, ChainOrigin, Epoch, InitialChainConfig, OracleResponse, Timestamp,
20    },
21    ensure, hex_debug,
22    identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, ModuleId, StreamId},
23    ownership::{ChainOwnership, TimeoutConfig},
24};
25use linera_views::{
26    context::Context,
27    map_view::HashedMapView,
28    register_view::HashedRegisterView,
29    set_view::HashedSetView,
30    views::{ClonableView, HashableView, ReplaceContext, View},
31};
32use serde::{Deserialize, Serialize};
33
34#[cfg(test)]
35use crate::test_utils::SystemExecutionState;
36use crate::{
37    committee::Committee, ApplicationDescription, ApplicationId, ExecutionError,
38    ExecutionRuntimeContext, MessageContext, MessageKind, OperationContext, OutgoingMessage,
39    QueryContext, QueryOutcome, ResourceController, TransactionTracker,
40};
41
42/// The event stream name for new epochs and committees.
43pub static EPOCH_STREAM_NAME: &[u8] = &[0];
44/// The event stream name for removed epochs.
45pub static REMOVED_EPOCH_STREAM_NAME: &[u8] = &[1];
46
47/// The number of times the [`SystemOperation::OpenChain`] was executed.
48#[cfg(with_metrics)]
49mod metrics {
50    use std::sync::LazyLock;
51
52    use linera_base::prometheus_util::register_int_counter_vec;
53    use prometheus::IntCounterVec;
54
55    pub static OPEN_CHAIN_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
56        register_int_counter_vec(
57            "open_chain_count",
58            "The number of times the `OpenChain` operation was executed",
59            &[],
60        )
61    });
62}
63
64/// A view accessing the execution state of the system of a chain.
65#[derive(Debug, ClonableView, HashableView)]
66pub struct SystemExecutionStateView<C> {
67    /// How the chain was created. May be unknown for inactive chains.
68    pub description: HashedRegisterView<C, Option<ChainDescription>>,
69    /// The number identifying the current configuration.
70    pub epoch: HashedRegisterView<C, Epoch>,
71    /// The admin of the chain.
72    pub admin_id: HashedRegisterView<C, Option<ChainId>>,
73    /// The committees that we trust, indexed by epoch number.
74    // Not using a `MapView` because the set active of committees is supposed to be
75    // small. Plus, currently, we would create the `BTreeMap` anyway in various places
76    // (e.g. the `OpenChain` operation).
77    pub committees: HashedRegisterView<C, BTreeMap<Epoch, Committee>>,
78    /// Ownership of the chain.
79    pub ownership: HashedRegisterView<C, ChainOwnership>,
80    /// Balance of the chain. (Available to any user able to create blocks in the chain.)
81    pub balance: HashedRegisterView<C, Amount>,
82    /// Balances attributed to a given owner.
83    pub balances: HashedMapView<C, AccountOwner, Amount>,
84    /// The timestamp of the most recent block.
85    pub timestamp: HashedRegisterView<C, Timestamp>,
86    /// Whether this chain has been closed.
87    pub closed: HashedRegisterView<C, bool>,
88    /// Permissions for applications on this chain.
89    pub application_permissions: HashedRegisterView<C, ApplicationPermissions>,
90    /// Blobs that have been used or published on this chain.
91    pub used_blobs: HashedSetView<C, BlobId>,
92    /// The event stream subscriptions of applications on this chain.
93    pub event_subscriptions: HashedMapView<C, (ChainId, StreamId), EventSubscriptions>,
94}
95
96impl<C: Context, C2: Context> ReplaceContext<C2> for SystemExecutionStateView<C> {
97    type Target = SystemExecutionStateView<C2>;
98
99    async fn with_context(
100        &mut self,
101        ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
102    ) -> Self::Target {
103        SystemExecutionStateView {
104            description: self.description.with_context(ctx.clone()).await,
105            epoch: self.epoch.with_context(ctx.clone()).await,
106            admin_id: self.admin_id.with_context(ctx.clone()).await,
107            committees: self.committees.with_context(ctx.clone()).await,
108            ownership: self.ownership.with_context(ctx.clone()).await,
109            balance: self.balance.with_context(ctx.clone()).await,
110            balances: self.balances.with_context(ctx.clone()).await,
111            timestamp: self.timestamp.with_context(ctx.clone()).await,
112            closed: self.closed.with_context(ctx.clone()).await,
113            application_permissions: self.application_permissions.with_context(ctx.clone()).await,
114            used_blobs: self.used_blobs.with_context(ctx.clone()).await,
115            event_subscriptions: self.event_subscriptions.with_context(ctx.clone()).await,
116        }
117    }
118}
119
120/// The applications subscribing to a particular stream, and the next event index.
121#[derive(Debug, Default, Clone, Serialize, Deserialize)]
122pub struct EventSubscriptions {
123    /// The next event index, i.e. the total number of events in this stream that have already
124    /// been processed by this chain.
125    pub next_index: u32,
126    /// The applications that are subscribed to this stream.
127    pub applications: BTreeSet<ApplicationId>,
128}
129
130/// The initial configuration for a new chain.
131#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
132pub struct OpenChainConfig {
133    /// The ownership configuration of the new chain.
134    pub ownership: ChainOwnership,
135    /// The initial chain balance.
136    pub balance: Amount,
137    /// The initial application permissions.
138    pub application_permissions: ApplicationPermissions,
139}
140
141impl OpenChainConfig {
142    /// Creates an [`InitialChainConfig`] based on this [`OpenChainConfig`] and additional
143    /// parameters.
144    pub fn init_chain_config(
145        &self,
146        epoch: Epoch,
147        min_active_epoch: Epoch,
148        max_active_epoch: Epoch,
149    ) -> InitialChainConfig {
150        InitialChainConfig {
151            application_permissions: self.application_permissions.clone(),
152            balance: self.balance,
153            epoch,
154            min_active_epoch,
155            max_active_epoch,
156            ownership: self.ownership.clone(),
157        }
158    }
159}
160
161/// A system operation.
162#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
163pub enum SystemOperation {
164    /// Transfers `amount` units of value from the given owner's account to the recipient.
165    /// If no owner is given, try to take the units out of the unattributed account.
166    Transfer {
167        owner: AccountOwner,
168        recipient: Account,
169        amount: Amount,
170    },
171    /// Claims `amount` units of value from the given owner's account in the remote
172    /// `target` chain. Depending on its configuration, the `target` chain may refuse to
173    /// process the message.
174    Claim {
175        owner: AccountOwner,
176        target_id: ChainId,
177        recipient: Account,
178        amount: Amount,
179    },
180    /// Creates (or activates) a new chain.
181    /// This will automatically subscribe to the future committees created by `admin_id`.
182    OpenChain(OpenChainConfig),
183    /// Closes the chain.
184    CloseChain,
185    /// Changes the ownership of the chain.
186    ChangeOwnership {
187        /// Super owners can propose fast blocks in the first round, and regular blocks in any round.
188        #[debug(skip_if = Vec::is_empty)]
189        super_owners: Vec<AccountOwner>,
190        /// The regular owners, with their weights that determine how often they are round leader.
191        #[debug(skip_if = Vec::is_empty)]
192        owners: Vec<(AccountOwner, u64)>,
193        /// The number of initial rounds after 0 in which all owners are allowed to propose blocks.
194        multi_leader_rounds: u32,
195        /// Whether the multi-leader rounds are unrestricted, i.e. not limited to chain owners.
196        /// This should only be `true` on chains with restrictive application permissions and an
197        /// application-based mechanism to select block proposers.
198        open_multi_leader_rounds: bool,
199        /// The timeout configuration: how long fast, multi-leader and single-leader rounds last.
200        timeout_config: TimeoutConfig,
201    },
202    /// Changes the application permissions configuration on this chain.
203    ChangeApplicationPermissions(ApplicationPermissions),
204    /// Publishes a new application module.
205    PublishModule { module_id: ModuleId },
206    /// Publishes a new data blob.
207    PublishDataBlob { blob_hash: CryptoHash },
208    /// Verifies that the given blob exists. Otherwise the block fails.
209    VerifyBlob { blob_id: BlobId },
210    /// Creates a new application.
211    CreateApplication {
212        module_id: ModuleId,
213        #[serde(with = "serde_bytes")]
214        #[debug(with = "hex_debug")]
215        parameters: Vec<u8>,
216        #[serde(with = "serde_bytes")]
217        #[debug(with = "hex_debug", skip_if = Vec::is_empty)]
218        instantiation_argument: Vec<u8>,
219        #[debug(skip_if = Vec::is_empty)]
220        required_application_ids: Vec<ApplicationId>,
221    },
222    /// Operations that are only allowed on the admin chain.
223    Admin(AdminOperation),
224    /// Processes an event about a new epoch and committee.
225    ProcessNewEpoch(Epoch),
226    /// Processes an event about a removed epoch and committee.
227    ProcessRemovedEpoch(Epoch),
228    /// Updates the event stream trackers.
229    UpdateStreams(Vec<(ChainId, StreamId, u32)>),
230}
231
232/// Operations that are only allowed on the admin chain.
233#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
234pub enum AdminOperation {
235    /// Publishes a new committee as a blob. This can be assigned to an epoch using
236    /// [`AdminOperation::CreateCommittee`] in a later block.
237    PublishCommitteeBlob { blob_hash: CryptoHash },
238    /// Registers a new committee. Other chains can then migrate to the new epoch by executing
239    /// [`SystemOperation::ProcessNewEpoch`].
240    CreateCommittee { epoch: Epoch, blob_hash: CryptoHash },
241    /// Removes a committee. Other chains should execute [`SystemOperation::ProcessRemovedEpoch`],
242    /// so that blocks from the retired epoch will not be accepted until they are followed (hence
243    /// re-certified) by a block certified by a recent committee.
244    RemoveCommittee { epoch: Epoch },
245}
246
247/// A system message meant to be executed on a remote chain.
248#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
249pub enum SystemMessage {
250    /// Credits `amount` units of value to the account `target` -- unless the message is
251    /// bouncing, in which case `source` is credited instead.
252    Credit {
253        target: AccountOwner,
254        amount: Amount,
255        source: AccountOwner,
256    },
257    /// Withdraws `amount` units of value from the account and starts a transfer to credit
258    /// the recipient. The message must be properly authenticated. Receiver chains may
259    /// refuse it depending on their configuration.
260    Withdraw {
261        owner: AccountOwner,
262        amount: Amount,
263        recipient: Account,
264    },
265}
266
267/// A query to the system state.
268#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
269pub struct SystemQuery;
270
271/// The response to a system query.
272#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
273pub struct SystemResponse {
274    pub chain_id: ChainId,
275    pub balance: Amount,
276}
277
278/// Optional user message attached to a transfer.
279#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Hash, Default, Debug, Serialize, Deserialize)]
280pub struct UserData(pub Option<[u8; 32]>);
281
282impl UserData {
283    pub fn from_option_string(opt_str: Option<String>) -> Result<Self, usize> {
284        // Convert the Option<String> to Option<[u8; 32]>
285        let option_array = match opt_str {
286            Some(s) => {
287                // Convert the String to a Vec<u8>
288                let vec = s.into_bytes();
289                if vec.len() <= 32 {
290                    // Create an array from the Vec<u8>
291                    let mut array = [b' '; 32];
292
293                    // Copy bytes from the vector into the array
294                    let len = vec.len().min(32);
295                    array[..len].copy_from_slice(&vec[..len]);
296
297                    Some(array)
298                } else {
299                    return Err(vec.len());
300                }
301            }
302            None => None,
303        };
304
305        // Return the UserData with the converted Option<[u8; 32]>
306        Ok(UserData(option_array))
307    }
308}
309
310#[derive(Debug)]
311pub struct CreateApplicationResult {
312    pub app_id: ApplicationId,
313    pub txn_tracker: TransactionTracker,
314}
315
316impl<C> SystemExecutionStateView<C>
317where
318    C: Context + Clone + Send + Sync + 'static,
319    C::Extra: ExecutionRuntimeContext,
320{
321    /// Invariant for the states of active chains.
322    pub fn is_active(&self) -> bool {
323        self.description.get().is_some()
324            && self.ownership.get().is_active()
325            && self.current_committee().is_some()
326            && self.admin_id.get().is_some()
327    }
328
329    /// Returns the current committee, if any.
330    pub fn current_committee(&self) -> Option<(Epoch, &Committee)> {
331        let epoch = self.epoch.get();
332        let committee = self.committees.get().get(epoch)?;
333        Some((*epoch, committee))
334    }
335
336    async fn get_event(&self, event_id: EventId) -> Result<Vec<u8>, ExecutionError> {
337        match self.context().extra().get_event(event_id.clone()).await? {
338            None => Err(ExecutionError::EventsNotFound(vec![event_id])),
339            Some(vec) => Ok(vec),
340        }
341    }
342
343    /// Executes the sender's side of an operation and returns a list of actions to be
344    /// taken.
345    pub async fn execute_operation(
346        &mut self,
347        context: OperationContext,
348        operation: SystemOperation,
349        txn_tracker: &mut TransactionTracker,
350        resource_controller: &mut ResourceController<Option<AccountOwner>>,
351    ) -> Result<Option<(ApplicationId, Vec<u8>)>, ExecutionError> {
352        use SystemOperation::*;
353        let mut new_application = None;
354        match operation {
355            OpenChain(config) => {
356                let _chain_id = self
357                    .open_chain(
358                        config,
359                        context.chain_id,
360                        context.height,
361                        context.timestamp,
362                        txn_tracker,
363                    )
364                    .await?;
365                #[cfg(with_metrics)]
366                metrics::OPEN_CHAIN_COUNT.with_label_values(&[]).inc();
367            }
368            ChangeOwnership {
369                super_owners,
370                owners,
371                multi_leader_rounds,
372                open_multi_leader_rounds,
373                timeout_config,
374            } => {
375                self.ownership.set(ChainOwnership {
376                    super_owners: super_owners.into_iter().collect(),
377                    owners: owners.into_iter().collect(),
378                    multi_leader_rounds,
379                    open_multi_leader_rounds,
380                    timeout_config,
381                });
382            }
383            ChangeApplicationPermissions(application_permissions) => {
384                self.application_permissions.set(application_permissions);
385            }
386            CloseChain => self.close_chain().await?,
387            Transfer {
388                owner,
389                amount,
390                recipient,
391            } => {
392                let maybe_message = self
393                    .transfer(context.authenticated_signer, None, owner, recipient, amount)
394                    .await?;
395                txn_tracker.add_outgoing_messages(maybe_message);
396            }
397            Claim {
398                owner,
399                target_id,
400                recipient,
401                amount,
402            } => {
403                let maybe_message = self
404                    .claim(
405                        context.authenticated_signer,
406                        None,
407                        owner,
408                        target_id,
409                        recipient,
410                        amount,
411                    )
412                    .await?;
413                txn_tracker.add_outgoing_messages(maybe_message);
414            }
415            Admin(admin_operation) => {
416                ensure!(
417                    *self.admin_id.get() == Some(context.chain_id),
418                    ExecutionError::AdminOperationOnNonAdminChain
419                );
420                match admin_operation {
421                    AdminOperation::PublishCommitteeBlob { blob_hash } => {
422                        self.blob_published(
423                            &BlobId::new(blob_hash, BlobType::Committee),
424                            txn_tracker,
425                        )?;
426                    }
427                    AdminOperation::CreateCommittee { epoch, blob_hash } => {
428                        self.check_next_epoch(epoch)?;
429                        let blob_id = BlobId::new(blob_hash, BlobType::Committee);
430                        let committee =
431                            bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
432                        self.blob_used(txn_tracker, blob_id).await?;
433                        self.committees.get_mut().insert(epoch, committee);
434                        self.epoch.set(epoch);
435                        txn_tracker.add_event(
436                            StreamId::system(EPOCH_STREAM_NAME),
437                            epoch.0,
438                            bcs::to_bytes(&blob_hash)?,
439                        );
440                    }
441                    AdminOperation::RemoveCommittee { epoch } => {
442                        ensure!(
443                            self.committees.get_mut().remove(&epoch).is_some(),
444                            ExecutionError::InvalidCommitteeRemoval
445                        );
446                        txn_tracker.add_event(
447                            StreamId::system(REMOVED_EPOCH_STREAM_NAME),
448                            epoch.0,
449                            vec![],
450                        );
451                    }
452                }
453            }
454            PublishModule { module_id } => {
455                for blob_id in module_id.bytecode_blob_ids() {
456                    self.blob_published(&blob_id, txn_tracker)?;
457                }
458            }
459            CreateApplication {
460                module_id,
461                parameters,
462                instantiation_argument,
463                required_application_ids,
464            } => {
465                let txn_tracker_moved = mem::take(txn_tracker);
466                let CreateApplicationResult {
467                    app_id,
468                    txn_tracker: txn_tracker_moved,
469                } = self
470                    .create_application(
471                        context.chain_id,
472                        context.height,
473                        module_id,
474                        parameters,
475                        required_application_ids,
476                        txn_tracker_moved,
477                    )
478                    .await?;
479                *txn_tracker = txn_tracker_moved;
480                new_application = Some((app_id, instantiation_argument));
481            }
482            PublishDataBlob { blob_hash } => {
483                self.blob_published(&BlobId::new(blob_hash, BlobType::Data), txn_tracker)?;
484            }
485            VerifyBlob { blob_id } => {
486                self.assert_blob_exists(blob_id).await?;
487                resource_controller
488                    .with_state(self)
489                    .await?
490                    .track_blob_read(0)?;
491                self.blob_used(txn_tracker, blob_id).await?;
492            }
493            ProcessNewEpoch(epoch) => {
494                self.check_next_epoch(epoch)?;
495                let admin_id = self
496                    .admin_id
497                    .get()
498                    .ok_or_else(|| ExecutionError::InactiveChain(context.chain_id))?;
499                let event_id = EventId {
500                    chain_id: admin_id,
501                    stream_id: StreamId::system(EPOCH_STREAM_NAME),
502                    index: epoch.0,
503                };
504                let bytes = match txn_tracker.next_replayed_oracle_response()? {
505                    None => self.get_event(event_id.clone()).await?,
506                    Some(OracleResponse::Event(recorded_event_id, bytes))
507                        if recorded_event_id == event_id =>
508                    {
509                        bytes
510                    }
511                    Some(_) => return Err(ExecutionError::OracleResponseMismatch),
512                };
513                let blob_id = BlobId::new(bcs::from_bytes(&bytes)?, BlobType::Committee);
514                txn_tracker.add_oracle_response(OracleResponse::Event(event_id, bytes));
515                let committee = bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
516                self.blob_used(txn_tracker, blob_id).await?;
517                self.committees.get_mut().insert(epoch, committee);
518                self.epoch.set(epoch);
519            }
520            ProcessRemovedEpoch(epoch) => {
521                ensure!(
522                    self.committees.get_mut().remove(&epoch).is_some(),
523                    ExecutionError::InvalidCommitteeRemoval
524                );
525                let admin_id = self
526                    .admin_id
527                    .get()
528                    .ok_or_else(|| ExecutionError::InactiveChain(context.chain_id))?;
529                let event_id = EventId {
530                    chain_id: admin_id,
531                    stream_id: StreamId::system(REMOVED_EPOCH_STREAM_NAME),
532                    index: epoch.0,
533                };
534                let bytes = match txn_tracker.next_replayed_oracle_response()? {
535                    None => self.get_event(event_id.clone()).await?,
536                    Some(OracleResponse::Event(recorded_event_id, bytes))
537                        if recorded_event_id == event_id =>
538                    {
539                        bytes
540                    }
541                    Some(_) => return Err(ExecutionError::OracleResponseMismatch),
542                };
543                txn_tracker.add_oracle_response(OracleResponse::Event(event_id, bytes));
544            }
545            UpdateStreams(streams) => {
546                let mut missing_events = Vec::new();
547                for (chain_id, stream_id, next_index) in streams {
548                    let subscriptions = self
549                        .event_subscriptions
550                        .get_mut_or_default(&(chain_id, stream_id.clone()))
551                        .await?;
552                    ensure!(
553                        subscriptions.next_index < next_index,
554                        ExecutionError::OutdatedUpdateStreams
555                    );
556                    for application_id in &subscriptions.applications {
557                        txn_tracker.add_stream_to_process(
558                            *application_id,
559                            chain_id,
560                            stream_id.clone(),
561                            subscriptions.next_index,
562                            next_index,
563                        );
564                    }
565                    subscriptions.next_index = next_index;
566                    let index = next_index
567                        .checked_sub(1)
568                        .ok_or(ArithmeticError::Underflow)?;
569                    let event_id = EventId {
570                        chain_id,
571                        stream_id,
572                        index,
573                    };
574                    match txn_tracker.next_replayed_oracle_response()? {
575                        None => {
576                            if !self
577                                .context()
578                                .extra()
579                                .contains_event(event_id.clone())
580                                .await?
581                            {
582                                missing_events.push(event_id);
583                                continue;
584                            }
585                        }
586                        Some(OracleResponse::EventExists(recorded_event_id))
587                            if recorded_event_id == event_id => {}
588                        Some(_) => return Err(ExecutionError::OracleResponseMismatch),
589                    }
590                    txn_tracker.add_oracle_response(OracleResponse::EventExists(event_id));
591                }
592                ensure!(
593                    missing_events.is_empty(),
594                    ExecutionError::EventsNotFound(missing_events)
595                );
596            }
597        }
598
599        Ok(new_application)
600    }
601
602    /// Returns an error if the `provided` epoch is not exactly one higher than the chain's current
603    /// epoch.
604    fn check_next_epoch(&self, provided: Epoch) -> Result<(), ExecutionError> {
605        let expected = self.epoch.get().try_add_one()?;
606        ensure!(
607            provided == expected,
608            ExecutionError::InvalidCommitteeEpoch { provided, expected }
609        );
610        Ok(())
611    }
612
613    async fn credit(&mut self, owner: &AccountOwner, amount: Amount) -> Result<(), ExecutionError> {
614        if owner == &AccountOwner::CHAIN {
615            let new_balance = self.balance.get().saturating_add(amount);
616            self.balance.set(new_balance);
617        } else {
618            let balance = self.balances.get_mut_or_default(owner).await?;
619            *balance = balance.saturating_add(amount);
620        }
621        Ok(())
622    }
623
624    async fn credit_or_send_message(
625        &mut self,
626        source: AccountOwner,
627        recipient: Account,
628        amount: Amount,
629    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
630        let source_chain_id = self.context().extra().chain_id();
631        if recipient.chain_id == source_chain_id {
632            // Handle same-chain transfer locally.
633            let target = recipient.owner;
634            self.credit(&target, amount).await?;
635            Ok(None)
636        } else {
637            // Handle cross-chain transfer with message.
638            let message = SystemMessage::Credit {
639                amount,
640                source,
641                target: recipient.owner,
642            };
643            Ok(Some(
644                OutgoingMessage::new(recipient.chain_id, message).with_kind(MessageKind::Tracked),
645            ))
646        }
647    }
648
649    pub async fn transfer(
650        &mut self,
651        authenticated_signer: Option<AccountOwner>,
652        authenticated_application_id: Option<ApplicationId>,
653        source: AccountOwner,
654        recipient: Account,
655        amount: Amount,
656    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
657        if source == AccountOwner::CHAIN {
658            ensure!(
659                authenticated_signer.is_some()
660                    && self
661                        .ownership
662                        .get()
663                        .verify_owner(&authenticated_signer.unwrap()),
664                ExecutionError::UnauthenticatedTransferOwner
665            );
666        } else {
667            ensure!(
668                authenticated_signer == Some(source)
669                    || authenticated_application_id.map(AccountOwner::from) == Some(source),
670                ExecutionError::UnauthenticatedTransferOwner
671            );
672        }
673        ensure!(
674            amount > Amount::ZERO,
675            ExecutionError::IncorrectTransferAmount
676        );
677        self.debit(&source, amount).await?;
678        self.credit_or_send_message(source, recipient, amount).await
679    }
680
681    pub async fn claim(
682        &mut self,
683        authenticated_signer: Option<AccountOwner>,
684        authenticated_application_id: Option<ApplicationId>,
685        source: AccountOwner,
686        target_id: ChainId,
687        recipient: Account,
688        amount: Amount,
689    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
690        ensure!(
691            authenticated_signer == Some(source)
692                || authenticated_application_id.map(AccountOwner::from) == Some(source),
693            ExecutionError::UnauthenticatedClaimOwner
694        );
695        ensure!(amount > Amount::ZERO, ExecutionError::IncorrectClaimAmount);
696
697        let current_chain_id = self.context().extra().chain_id();
698        if target_id == current_chain_id {
699            // Handle same-chain claim locally by processing the withdraw operation directly
700            self.debit(&source, amount).await?;
701            self.credit_or_send_message(source, recipient, amount).await
702        } else {
703            // Handle cross-chain claim with Withdraw message
704            let message = SystemMessage::Withdraw {
705                amount,
706                owner: source,
707                recipient,
708            };
709            Ok(Some(
710                OutgoingMessage::new(target_id, message)
711                    .with_authenticated_signer(authenticated_signer),
712            ))
713        }
714    }
715
716    /// Debits an [`Amount`] of tokens from an account's balance.
717    async fn debit(
718        &mut self,
719        account: &AccountOwner,
720        amount: Amount,
721    ) -> Result<(), ExecutionError> {
722        let balance = if account == &AccountOwner::CHAIN {
723            self.balance.get_mut()
724        } else {
725            self.balances.get_mut(account).await?.ok_or_else(|| {
726                ExecutionError::InsufficientBalance {
727                    balance: Amount::ZERO,
728                    account: *account,
729                }
730            })?
731        };
732
733        balance
734            .try_sub_assign(amount)
735            .map_err(|_| ExecutionError::InsufficientBalance {
736                balance: *balance,
737                account: *account,
738            })?;
739
740        if account != &AccountOwner::CHAIN && balance.is_zero() {
741            self.balances.remove(account)?;
742        }
743
744        Ok(())
745    }
746
747    /// Executes a cross-chain message that represents the recipient's side of an operation.
748    pub async fn execute_message(
749        &mut self,
750        context: MessageContext,
751        message: SystemMessage,
752    ) -> Result<Vec<OutgoingMessage>, ExecutionError> {
753        let mut outcome = Vec::new();
754        use SystemMessage::*;
755        match message {
756            Credit {
757                amount,
758                source,
759                target,
760            } => {
761                let receiver = if context.is_bouncing { source } else { target };
762                self.credit(&receiver, amount).await?;
763            }
764            Withdraw {
765                amount,
766                owner,
767                recipient,
768            } => {
769                self.debit(&owner, amount).await?;
770                if let Some(message) = self
771                    .credit_or_send_message(owner, recipient, amount)
772                    .await?
773                {
774                    outcome.push(message);
775                }
776            }
777        }
778        Ok(outcome)
779    }
780
781    /// Initializes the system application state on a newly opened chain.
782    /// Returns `Ok(true)` if the chain was already initialized, `Ok(false)` if it wasn't.
783    pub async fn initialize_chain(&mut self, chain_id: ChainId) -> Result<bool, ExecutionError> {
784        if self.description.get().is_some() {
785            // already initialized
786            return Ok(true);
787        }
788        let description_blob = self
789            .read_blob_content(BlobId::new(chain_id.0, BlobType::ChainDescription))
790            .await?;
791        let description: ChainDescription = bcs::from_bytes(description_blob.bytes())?;
792        let InitialChainConfig {
793            ownership,
794            epoch,
795            balance,
796            min_active_epoch,
797            max_active_epoch,
798            application_permissions,
799        } = description.config().clone();
800        self.timestamp.set(description.timestamp());
801        self.description.set(Some(description));
802        self.epoch.set(epoch);
803        let committees = self
804            .context()
805            .extra()
806            .committees_for(min_active_epoch..=max_active_epoch)
807            .await?;
808        self.committees.set(committees);
809        let admin_id = self
810            .context()
811            .extra()
812            .get_network_description()
813            .await?
814            .ok_or(ExecutionError::NoNetworkDescriptionFound)?
815            .admin_chain_id;
816        self.admin_id.set(Some(admin_id));
817        self.ownership.set(ownership);
818        self.balance.set(balance);
819        self.application_permissions.set(application_permissions);
820        Ok(false)
821    }
822
823    pub async fn handle_query(
824        &mut self,
825        context: QueryContext,
826        _query: SystemQuery,
827    ) -> Result<QueryOutcome<SystemResponse>, ExecutionError> {
828        let response = SystemResponse {
829            chain_id: context.chain_id,
830            balance: *self.balance.get(),
831        };
832        Ok(QueryOutcome {
833            response,
834            operations: vec![],
835        })
836    }
837
838    /// Returns the messages to open a new chain, and subtracts the new chain's balance
839    /// from this chain's.
840    pub async fn open_chain(
841        &mut self,
842        config: OpenChainConfig,
843        parent: ChainId,
844        block_height: BlockHeight,
845        timestamp: Timestamp,
846        txn_tracker: &mut TransactionTracker,
847    ) -> Result<ChainId, ExecutionError> {
848        let chain_index = txn_tracker.next_chain_index();
849        let chain_origin = ChainOrigin::Child {
850            parent,
851            block_height,
852            chain_index,
853        };
854        let init_chain_config = config.init_chain_config(
855            *self.epoch.get(),
856            self.committees
857                .get()
858                .keys()
859                .min()
860                .copied()
861                .unwrap_or(Epoch::ZERO),
862            self.committees
863                .get()
864                .keys()
865                .max()
866                .copied()
867                .unwrap_or(Epoch::ZERO),
868        );
869        let chain_description = ChainDescription::new(chain_origin, init_chain_config, timestamp);
870        let child_id = chain_description.id();
871        self.debit(&AccountOwner::CHAIN, config.balance).await?;
872        let blob = Blob::new_chain_description(&chain_description);
873        txn_tracker.add_created_blob(blob);
874        Ok(child_id)
875    }
876
877    pub async fn close_chain(&mut self) -> Result<(), ExecutionError> {
878        self.closed.set(true);
879        Ok(())
880    }
881
882    pub async fn create_application(
883        &mut self,
884        chain_id: ChainId,
885        block_height: BlockHeight,
886        module_id: ModuleId,
887        parameters: Vec<u8>,
888        required_application_ids: Vec<ApplicationId>,
889        mut txn_tracker: TransactionTracker,
890    ) -> Result<CreateApplicationResult, ExecutionError> {
891        let application_index = txn_tracker.next_application_index();
892
893        let blob_ids = self.check_bytecode_blobs(&module_id, &txn_tracker).await?;
894        // We only remember to register the blobs that aren't recorded in `used_blobs`
895        // already.
896        for blob_id in blob_ids {
897            self.blob_used(&mut txn_tracker, blob_id).await?;
898        }
899
900        let application_description = ApplicationDescription {
901            module_id,
902            creator_chain_id: chain_id,
903            block_height,
904            application_index,
905            parameters,
906            required_application_ids,
907        };
908        self.check_required_applications(&application_description, &mut txn_tracker)
909            .await?;
910
911        let blob = Blob::new_application_description(&application_description);
912        self.used_blobs.insert(&blob.id())?;
913        txn_tracker.add_created_blob(blob);
914
915        Ok(CreateApplicationResult {
916            app_id: ApplicationId::from(&application_description),
917            txn_tracker,
918        })
919    }
920
921    async fn check_required_applications(
922        &mut self,
923        application_description: &ApplicationDescription,
924        txn_tracker: &mut TransactionTracker,
925    ) -> Result<(), ExecutionError> {
926        // Make sure that referenced applications IDs have been registered.
927        for required_id in &application_description.required_application_ids {
928            Box::pin(self.describe_application(*required_id, txn_tracker)).await?;
929        }
930        Ok(())
931    }
932
933    /// Retrieves an application's description.
934    pub async fn describe_application(
935        &mut self,
936        id: ApplicationId,
937        txn_tracker: &mut TransactionTracker,
938    ) -> Result<ApplicationDescription, ExecutionError> {
939        let blob_id = id.description_blob_id();
940        let content = match txn_tracker.created_blobs().get(&blob_id) {
941            Some(content) => content.clone(),
942            None => self.read_blob_content(blob_id).await?,
943        };
944        self.blob_used(txn_tracker, blob_id).await?;
945        let description: ApplicationDescription = bcs::from_bytes(content.bytes())?;
946
947        let blob_ids = self
948            .check_bytecode_blobs(&description.module_id, txn_tracker)
949            .await?;
950        // We only remember to register the blobs that aren't recorded in `used_blobs`
951        // already.
952        for blob_id in blob_ids {
953            self.blob_used(txn_tracker, blob_id).await?;
954        }
955
956        self.check_required_applications(&description, txn_tracker)
957            .await?;
958
959        Ok(description)
960    }
961
962    /// Retrieves the recursive dependencies of applications and applies a topological sort.
963    pub async fn find_dependencies(
964        &mut self,
965        mut stack: Vec<ApplicationId>,
966        txn_tracker: &mut TransactionTracker,
967    ) -> Result<Vec<ApplicationId>, ExecutionError> {
968        // What we return at the end.
969        let mut result = Vec::new();
970        // The entries already inserted in `result`.
971        let mut sorted = HashSet::new();
972        // The entries for which dependencies have already been pushed once to the stack.
973        let mut seen = HashSet::new();
974
975        while let Some(id) = stack.pop() {
976            if sorted.contains(&id) {
977                continue;
978            }
979            if seen.contains(&id) {
980                // Second time we see this entry. It was last pushed just before its
981                // dependencies -- which are now fully sorted.
982                sorted.insert(id);
983                result.push(id);
984                continue;
985            }
986            // First time we see this entry:
987            // 1. Mark it so that its dependencies are no longer pushed to the stack.
988            seen.insert(id);
989            // 2. Schedule all the (yet unseen) dependencies, then this entry for a second visit.
990            stack.push(id);
991            let app = self.describe_application(id, txn_tracker).await?;
992            for child in app.required_application_ids.iter().rev() {
993                if !seen.contains(child) {
994                    stack.push(*child);
995                }
996            }
997        }
998        Ok(result)
999    }
1000
1001    /// Records a blob that is used in this block. If this is the first use on this chain, creates
1002    /// an oracle response for it.
1003    pub(crate) async fn blob_used(
1004        &mut self,
1005        txn_tracker: &mut TransactionTracker,
1006        blob_id: BlobId,
1007    ) -> Result<bool, ExecutionError> {
1008        if self.used_blobs.contains(&blob_id).await? {
1009            return Ok(false); // Nothing to do.
1010        }
1011        self.used_blobs.insert(&blob_id)?;
1012        txn_tracker.replay_oracle_response(OracleResponse::Blob(blob_id))?;
1013        Ok(true)
1014    }
1015
1016    /// Records a blob that is published in this block. This does not create an oracle entry, and
1017    /// the blob can be used without using an oracle in the future on this chain.
1018    fn blob_published(
1019        &mut self,
1020        blob_id: &BlobId,
1021        txn_tracker: &mut TransactionTracker,
1022    ) -> Result<(), ExecutionError> {
1023        self.used_blobs.insert(blob_id)?;
1024        txn_tracker.add_published_blob(*blob_id);
1025        Ok(())
1026    }
1027
1028    pub async fn read_blob_content(&self, blob_id: BlobId) -> Result<BlobContent, ExecutionError> {
1029        match self.context().extra().get_blob(blob_id).await {
1030            Ok(Some(blob)) => Ok(blob.into()),
1031            Ok(None) => Err(ExecutionError::BlobsNotFound(vec![blob_id])),
1032            Err(error) => Err(error.into()),
1033        }
1034    }
1035
1036    pub async fn assert_blob_exists(&mut self, blob_id: BlobId) -> Result<(), ExecutionError> {
1037        if self.context().extra().contains_blob(blob_id).await? {
1038            Ok(())
1039        } else {
1040            Err(ExecutionError::BlobsNotFound(vec![blob_id]))
1041        }
1042    }
1043
1044    async fn check_bytecode_blobs(
1045        &mut self,
1046        module_id: &ModuleId,
1047        txn_tracker: &TransactionTracker,
1048    ) -> Result<Vec<BlobId>, ExecutionError> {
1049        let blob_ids = module_id.bytecode_blob_ids();
1050
1051        let mut missing_blobs = Vec::new();
1052        for blob_id in &blob_ids {
1053            // First check if blob is present in created_blobs
1054            if txn_tracker.created_blobs().contains_key(blob_id) {
1055                continue; // Blob found in created_blobs, it's ok
1056            }
1057            // If not in created_blobs, check storage
1058            if !self.context().extra().contains_blob(*blob_id).await? {
1059                missing_blobs.push(*blob_id);
1060            }
1061        }
1062        ensure!(
1063            missing_blobs.is_empty(),
1064            ExecutionError::BlobsNotFound(missing_blobs)
1065        );
1066
1067        Ok(blob_ids)
1068    }
1069}