linera_execution/
system.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(test)]
6#[path = "./unit_tests/system_tests.rs"]
7mod tests;
8
9use std::collections::{BTreeMap, BTreeSet, HashSet};
10
11use allocative::Allocative;
12use custom_debug_derive::Debug;
13use linera_base::{
14    crypto::CryptoHash,
15    data_types::{
16        Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
17        ChainDescription, ChainOrigin, Epoch, InitialChainConfig, OracleResponse, Timestamp,
18    },
19    ensure, hex_debug,
20    identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, ModuleId, StreamId},
21    ownership::{ChainOwnership, TimeoutConfig},
22};
23use linera_views::{
24    context::Context,
25    map_view::HashedMapView,
26    register_view::HashedRegisterView,
27    set_view::HashedSetView,
28    views::{ClonableView, HashableView, ReplaceContext, View},
29};
30use serde::{Deserialize, Serialize};
31
32#[cfg(test)]
33use crate::test_utils::SystemExecutionState;
34use crate::{
35    committee::Committee, util::OracleResponseExt as _, ApplicationDescription, ApplicationId,
36    ExecutionError, ExecutionRuntimeContext, MessageContext, MessageKind, OperationContext,
37    OutgoingMessage, QueryContext, QueryOutcome, ResourceController, TransactionTracker,
38};
39
40/// The event stream name for new epochs and committees.
41pub static EPOCH_STREAM_NAME: &[u8] = &[0];
42/// The event stream name for removed epochs.
43pub static REMOVED_EPOCH_STREAM_NAME: &[u8] = &[1];
44
45/// The number of times the [`SystemOperation::OpenChain`] was executed.
46#[cfg(with_metrics)]
47mod metrics {
48    use std::sync::LazyLock;
49
50    use linera_base::prometheus_util::register_int_counter_vec;
51    use prometheus::IntCounterVec;
52
53    pub static OPEN_CHAIN_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
54        register_int_counter_vec(
55            "open_chain_count",
56            "The number of times the `OpenChain` operation was executed",
57            &[],
58        )
59    });
60}
61
62/// A view accessing the execution state of the system of a chain.
63#[derive(Debug, ClonableView, HashableView, Allocative)]
64#[allocative(bound = "C")]
65pub struct SystemExecutionStateView<C> {
66    /// How the chain was created. May be unknown for inactive chains.
67    pub description: HashedRegisterView<C, Option<ChainDescription>>,
68    /// The number identifying the current configuration.
69    pub epoch: HashedRegisterView<C, Epoch>,
70    /// The admin of the chain.
71    pub admin_id: HashedRegisterView<C, Option<ChainId>>,
72    /// The committees that we trust, indexed by epoch number.
73    // Not using a `MapView` because the set active of committees is supposed to be
74    // small. Plus, currently, we would create the `BTreeMap` anyway in various places
75    // (e.g. the `OpenChain` operation).
76    pub committees: HashedRegisterView<C, BTreeMap<Epoch, Committee>>,
77    /// Ownership of the chain.
78    pub ownership: HashedRegisterView<C, ChainOwnership>,
79    /// Balance of the chain. (Available to any user able to create blocks in the chain.)
80    pub balance: HashedRegisterView<C, Amount>,
81    /// Balances attributed to a given owner.
82    pub balances: HashedMapView<C, AccountOwner, Amount>,
83    /// The timestamp of the most recent block.
84    pub timestamp: HashedRegisterView<C, Timestamp>,
85    /// Whether this chain has been closed.
86    pub closed: HashedRegisterView<C, bool>,
87    /// Permissions for applications on this chain.
88    pub application_permissions: HashedRegisterView<C, ApplicationPermissions>,
89    /// Blobs that have been used or published on this chain.
90    pub used_blobs: HashedSetView<C, BlobId>,
91    /// The event stream subscriptions of applications on this chain.
92    pub event_subscriptions: HashedMapView<C, (ChainId, StreamId), EventSubscriptions>,
93}
94
95impl<C: Context, C2: Context> ReplaceContext<C2> for SystemExecutionStateView<C> {
96    type Target = SystemExecutionStateView<C2>;
97
98    async fn with_context(
99        &mut self,
100        ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
101    ) -> Self::Target {
102        SystemExecutionStateView {
103            description: self.description.with_context(ctx.clone()).await,
104            epoch: self.epoch.with_context(ctx.clone()).await,
105            admin_id: self.admin_id.with_context(ctx.clone()).await,
106            committees: self.committees.with_context(ctx.clone()).await,
107            ownership: self.ownership.with_context(ctx.clone()).await,
108            balance: self.balance.with_context(ctx.clone()).await,
109            balances: self.balances.with_context(ctx.clone()).await,
110            timestamp: self.timestamp.with_context(ctx.clone()).await,
111            closed: self.closed.with_context(ctx.clone()).await,
112            application_permissions: self.application_permissions.with_context(ctx.clone()).await,
113            used_blobs: self.used_blobs.with_context(ctx.clone()).await,
114            event_subscriptions: self.event_subscriptions.with_context(ctx.clone()).await,
115        }
116    }
117}
118
119/// The applications subscribing to a particular stream, and the next event index.
120#[derive(Debug, Default, Clone, Serialize, Deserialize, Allocative)]
121pub struct EventSubscriptions {
122    /// The next event index, i.e. the total number of events in this stream that have already
123    /// been processed by this chain.
124    pub next_index: u32,
125    /// The applications that are subscribed to this stream.
126    pub applications: BTreeSet<ApplicationId>,
127}
128
129/// The initial configuration for a new chain.
130#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
131pub struct OpenChainConfig {
132    /// The ownership configuration of the new chain.
133    pub ownership: ChainOwnership,
134    /// The initial chain balance.
135    pub balance: Amount,
136    /// The initial application permissions.
137    pub application_permissions: ApplicationPermissions,
138}
139
140impl OpenChainConfig {
141    /// Creates an [`InitialChainConfig`] based on this [`OpenChainConfig`] and additional
142    /// parameters.
143    pub fn init_chain_config(
144        &self,
145        epoch: Epoch,
146        min_active_epoch: Epoch,
147        max_active_epoch: Epoch,
148    ) -> InitialChainConfig {
149        InitialChainConfig {
150            application_permissions: self.application_permissions.clone(),
151            balance: self.balance,
152            epoch,
153            min_active_epoch,
154            max_active_epoch,
155            ownership: self.ownership.clone(),
156        }
157    }
158}
159
160/// A system operation.
161#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
162pub enum SystemOperation {
163    /// Transfers `amount` units of value from the given owner's account to the recipient.
164    /// If no owner is given, try to take the units out of the unattributed account.
165    Transfer {
166        owner: AccountOwner,
167        recipient: Account,
168        amount: Amount,
169    },
170    /// Claims `amount` units of value from the given owner's account in the remote
171    /// `target` chain. Depending on its configuration, the `target` chain may refuse to
172    /// process the message.
173    Claim {
174        owner: AccountOwner,
175        target_id: ChainId,
176        recipient: Account,
177        amount: Amount,
178    },
179    /// Creates (or activates) a new chain.
180    /// This will automatically subscribe to the future committees created by `admin_id`.
181    OpenChain(OpenChainConfig),
182    /// Closes the chain.
183    CloseChain,
184    /// Changes the ownership of the chain.
185    ChangeOwnership {
186        /// Super owners can propose fast blocks in the first round, and regular blocks in any round.
187        #[debug(skip_if = Vec::is_empty)]
188        super_owners: Vec<AccountOwner>,
189        /// The regular owners, with their weights that determine how often they are round leader.
190        #[debug(skip_if = Vec::is_empty)]
191        owners: Vec<(AccountOwner, u64)>,
192        /// The number of initial rounds after 0 in which all owners are allowed to propose blocks.
193        multi_leader_rounds: u32,
194        /// Whether the multi-leader rounds are unrestricted, i.e. not limited to chain owners.
195        /// This should only be `true` on chains with restrictive application permissions and an
196        /// application-based mechanism to select block proposers.
197        open_multi_leader_rounds: bool,
198        /// The timeout configuration: how long fast, multi-leader and single-leader rounds last.
199        timeout_config: TimeoutConfig,
200    },
201    /// Changes the application permissions configuration on this chain.
202    ChangeApplicationPermissions(ApplicationPermissions),
203    /// Publishes a new application module.
204    PublishModule { module_id: ModuleId },
205    /// Publishes a new data blob.
206    PublishDataBlob { blob_hash: CryptoHash },
207    /// Verifies that the given blob exists. Otherwise the block fails.
208    VerifyBlob { blob_id: BlobId },
209    /// Creates a new application.
210    CreateApplication {
211        module_id: ModuleId,
212        #[serde(with = "serde_bytes")]
213        #[debug(with = "hex_debug")]
214        parameters: Vec<u8>,
215        #[serde(with = "serde_bytes")]
216        #[debug(with = "hex_debug", skip_if = Vec::is_empty)]
217        instantiation_argument: Vec<u8>,
218        #[debug(skip_if = Vec::is_empty)]
219        required_application_ids: Vec<ApplicationId>,
220    },
221    /// Operations that are only allowed on the admin chain.
222    Admin(AdminOperation),
223    /// Processes an event about a new epoch and committee.
224    ProcessNewEpoch(Epoch),
225    /// Processes an event about a removed epoch and committee.
226    ProcessRemovedEpoch(Epoch),
227    /// Updates the event stream trackers.
228    UpdateStreams(Vec<(ChainId, StreamId, u32)>),
229}
230
231/// Operations that are only allowed on the admin chain.
232#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
233pub enum AdminOperation {
234    /// Publishes a new committee as a blob. This can be assigned to an epoch using
235    /// [`AdminOperation::CreateCommittee`] in a later block.
236    PublishCommitteeBlob { blob_hash: CryptoHash },
237    /// Registers a new committee. Other chains can then migrate to the new epoch by executing
238    /// [`SystemOperation::ProcessNewEpoch`].
239    CreateCommittee { epoch: Epoch, blob_hash: CryptoHash },
240    /// Removes a committee. Other chains should execute [`SystemOperation::ProcessRemovedEpoch`],
241    /// so that blocks from the retired epoch will not be accepted until they are followed (hence
242    /// re-certified) by a block certified by a recent committee.
243    RemoveCommittee { epoch: Epoch },
244}
245
246/// A system message meant to be executed on a remote chain.
247#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, Allocative)]
248pub enum SystemMessage {
249    /// Credits `amount` units of value to the account `target` -- unless the message is
250    /// bouncing, in which case `source` is credited instead.
251    Credit {
252        target: AccountOwner,
253        amount: Amount,
254        source: AccountOwner,
255    },
256    /// Withdraws `amount` units of value from the account and starts a transfer to credit
257    /// the recipient. The message must be properly authenticated. Receiver chains may
258    /// refuse it depending on their configuration.
259    Withdraw {
260        owner: AccountOwner,
261        amount: Amount,
262        recipient: Account,
263    },
264}
265
266/// A query to the system state.
267#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
268pub struct SystemQuery;
269
270/// The response to a system query.
271#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
272pub struct SystemResponse {
273    pub chain_id: ChainId,
274    pub balance: Amount,
275}
276
277/// Optional user message attached to a transfer.
278#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Hash, Default, Debug, Serialize, Deserialize)]
279pub struct UserData(pub Option<[u8; 32]>);
280
281impl UserData {
282    pub fn from_option_string(opt_str: Option<String>) -> Result<Self, usize> {
283        // Convert the Option<String> to Option<[u8; 32]>
284        let option_array = match opt_str {
285            Some(s) => {
286                // Convert the String to a Vec<u8>
287                let vec = s.into_bytes();
288                if vec.len() <= 32 {
289                    // Create an array from the Vec<u8>
290                    let mut array = [b' '; 32];
291
292                    // Copy bytes from the vector into the array
293                    let len = vec.len().min(32);
294                    array[..len].copy_from_slice(&vec[..len]);
295
296                    Some(array)
297                } else {
298                    return Err(vec.len());
299                }
300            }
301            None => None,
302        };
303
304        // Return the UserData with the converted Option<[u8; 32]>
305        Ok(UserData(option_array))
306    }
307}
308
309#[derive(Debug)]
310pub struct CreateApplicationResult {
311    pub app_id: ApplicationId,
312}
313
314impl<C> SystemExecutionStateView<C>
315where
316    C: Context + Clone + Send + Sync + 'static,
317    C::Extra: ExecutionRuntimeContext,
318{
319    /// Invariant for the states of active chains.
320    pub fn is_active(&self) -> bool {
321        self.description.get().is_some()
322            && self.ownership.get().is_active()
323            && self.current_committee().is_some()
324            && self.admin_id.get().is_some()
325    }
326
327    /// Returns the current committee, if any.
328    pub fn current_committee(&self) -> Option<(Epoch, &Committee)> {
329        let epoch = self.epoch.get();
330        let committee = self.committees.get().get(epoch)?;
331        Some((*epoch, committee))
332    }
333
334    async fn get_event(&self, event_id: EventId) -> Result<Vec<u8>, ExecutionError> {
335        match self.context().extra().get_event(event_id.clone()).await? {
336            None => Err(ExecutionError::EventsNotFound(vec![event_id])),
337            Some(vec) => Ok(vec),
338        }
339    }
340
341    /// Executes the sender's side of an operation and returns a list of actions to be
342    /// taken.
343    pub async fn execute_operation(
344        &mut self,
345        context: OperationContext,
346        operation: SystemOperation,
347        txn_tracker: &mut TransactionTracker,
348        resource_controller: &mut ResourceController<Option<AccountOwner>>,
349    ) -> Result<Option<(ApplicationId, Vec<u8>)>, ExecutionError> {
350        use SystemOperation::*;
351        let mut new_application = None;
352        match operation {
353            OpenChain(config) => {
354                let _chain_id = self
355                    .open_chain(
356                        config,
357                        context.chain_id,
358                        context.height,
359                        context.timestamp,
360                        txn_tracker,
361                    )
362                    .await?;
363                #[cfg(with_metrics)]
364                metrics::OPEN_CHAIN_COUNT.with_label_values(&[]).inc();
365            }
366            ChangeOwnership {
367                super_owners,
368                owners,
369                multi_leader_rounds,
370                open_multi_leader_rounds,
371                timeout_config,
372            } => {
373                self.ownership.set(ChainOwnership {
374                    super_owners: super_owners.into_iter().collect(),
375                    owners: owners.into_iter().collect(),
376                    multi_leader_rounds,
377                    open_multi_leader_rounds,
378                    timeout_config,
379                });
380            }
381            ChangeApplicationPermissions(application_permissions) => {
382                self.application_permissions.set(application_permissions);
383            }
384            CloseChain => self.close_chain().await?,
385            Transfer {
386                owner,
387                amount,
388                recipient,
389            } => {
390                let maybe_message = self
391                    .transfer(context.authenticated_signer, None, owner, recipient, amount)
392                    .await?;
393                txn_tracker.add_outgoing_messages(maybe_message);
394            }
395            Claim {
396                owner,
397                target_id,
398                recipient,
399                amount,
400            } => {
401                let maybe_message = self
402                    .claim(
403                        context.authenticated_signer,
404                        None,
405                        owner,
406                        target_id,
407                        recipient,
408                        amount,
409                    )
410                    .await?;
411                txn_tracker.add_outgoing_messages(maybe_message);
412            }
413            Admin(admin_operation) => {
414                ensure!(
415                    *self.admin_id.get() == Some(context.chain_id),
416                    ExecutionError::AdminOperationOnNonAdminChain
417                );
418                match admin_operation {
419                    AdminOperation::PublishCommitteeBlob { blob_hash } => {
420                        self.blob_published(
421                            &BlobId::new(blob_hash, BlobType::Committee),
422                            txn_tracker,
423                        )?;
424                    }
425                    AdminOperation::CreateCommittee { epoch, blob_hash } => {
426                        self.check_next_epoch(epoch)?;
427                        let blob_id = BlobId::new(blob_hash, BlobType::Committee);
428                        let committee =
429                            bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
430                        self.blob_used(txn_tracker, blob_id).await?;
431                        self.committees.get_mut().insert(epoch, committee);
432                        self.epoch.set(epoch);
433                        txn_tracker.add_event(
434                            StreamId::system(EPOCH_STREAM_NAME),
435                            epoch.0,
436                            bcs::to_bytes(&blob_hash)?,
437                        );
438                    }
439                    AdminOperation::RemoveCommittee { epoch } => {
440                        ensure!(
441                            self.committees.get_mut().remove(&epoch).is_some(),
442                            ExecutionError::InvalidCommitteeRemoval
443                        );
444                        txn_tracker.add_event(
445                            StreamId::system(REMOVED_EPOCH_STREAM_NAME),
446                            epoch.0,
447                            vec![],
448                        );
449                    }
450                }
451            }
452            PublishModule { module_id } => {
453                for blob_id in module_id.bytecode_blob_ids() {
454                    self.blob_published(&blob_id, txn_tracker)?;
455                }
456            }
457            CreateApplication {
458                module_id,
459                parameters,
460                instantiation_argument,
461                required_application_ids,
462            } => {
463                let CreateApplicationResult { app_id } = self
464                    .create_application(
465                        context.chain_id,
466                        context.height,
467                        module_id,
468                        parameters,
469                        required_application_ids,
470                        txn_tracker,
471                    )
472                    .await?;
473                new_application = Some((app_id, instantiation_argument));
474            }
475            PublishDataBlob { blob_hash } => {
476                self.blob_published(&BlobId::new(blob_hash, BlobType::Data), txn_tracker)?;
477            }
478            VerifyBlob { blob_id } => {
479                self.assert_blob_exists(blob_id).await?;
480                resource_controller
481                    .with_state(self)
482                    .await?
483                    .track_blob_read(0)?;
484                self.blob_used(txn_tracker, blob_id).await?;
485            }
486            ProcessNewEpoch(epoch) => {
487                self.check_next_epoch(epoch)?;
488                let admin_id = self
489                    .admin_id
490                    .get()
491                    .ok_or_else(|| ExecutionError::InactiveChain(context.chain_id))?;
492                let event_id = EventId {
493                    chain_id: admin_id,
494                    stream_id: StreamId::system(EPOCH_STREAM_NAME),
495                    index: epoch.0,
496                };
497                let bytes = txn_tracker
498                    .oracle(|| async {
499                        let bytes = self.get_event(event_id.clone()).await?;
500                        Ok(OracleResponse::Event(event_id.clone(), bytes))
501                    })
502                    .await?
503                    .to_event(&event_id)?;
504                let blob_id = BlobId::new(bcs::from_bytes(&bytes)?, BlobType::Committee);
505                let committee = bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
506                self.blob_used(txn_tracker, blob_id).await?;
507                self.committees.get_mut().insert(epoch, committee);
508                self.epoch.set(epoch);
509            }
510            ProcessRemovedEpoch(epoch) => {
511                ensure!(
512                    self.committees.get_mut().remove(&epoch).is_some(),
513                    ExecutionError::InvalidCommitteeRemoval
514                );
515                let admin_id = self
516                    .admin_id
517                    .get()
518                    .ok_or_else(|| ExecutionError::InactiveChain(context.chain_id))?;
519                let event_id = EventId {
520                    chain_id: admin_id,
521                    stream_id: StreamId::system(REMOVED_EPOCH_STREAM_NAME),
522                    index: epoch.0,
523                };
524                txn_tracker
525                    .oracle(|| async {
526                        let bytes = self.get_event(event_id.clone()).await?;
527                        Ok(OracleResponse::Event(event_id, bytes))
528                    })
529                    .await?;
530            }
531            UpdateStreams(streams) => {
532                let mut missing_events = Vec::new();
533                for (chain_id, stream_id, next_index) in streams {
534                    let subscriptions = self
535                        .event_subscriptions
536                        .get_mut_or_default(&(chain_id, stream_id.clone()))
537                        .await?;
538                    ensure!(
539                        subscriptions.next_index < next_index,
540                        ExecutionError::OutdatedUpdateStreams
541                    );
542                    for application_id in &subscriptions.applications {
543                        txn_tracker.add_stream_to_process(
544                            *application_id,
545                            chain_id,
546                            stream_id.clone(),
547                            subscriptions.next_index,
548                            next_index,
549                        );
550                    }
551                    subscriptions.next_index = next_index;
552                    let index = next_index
553                        .checked_sub(1)
554                        .ok_or(ArithmeticError::Underflow)?;
555                    let event_id = EventId {
556                        chain_id,
557                        stream_id,
558                        index,
559                    };
560                    let extra = self.context().extra();
561                    txn_tracker
562                        .oracle(|| async {
563                            if !extra.contains_event(event_id.clone()).await? {
564                                missing_events.push(event_id.clone());
565                            }
566                            Ok(OracleResponse::EventExists(event_id))
567                        })
568                        .await?;
569                }
570                ensure!(
571                    missing_events.is_empty(),
572                    ExecutionError::EventsNotFound(missing_events)
573                );
574            }
575        }
576
577        Ok(new_application)
578    }
579
580    /// Returns an error if the `provided` epoch is not exactly one higher than the chain's current
581    /// epoch.
582    fn check_next_epoch(&self, provided: Epoch) -> Result<(), ExecutionError> {
583        let expected = self.epoch.get().try_add_one()?;
584        ensure!(
585            provided == expected,
586            ExecutionError::InvalidCommitteeEpoch { provided, expected }
587        );
588        Ok(())
589    }
590
591    async fn credit(&mut self, owner: &AccountOwner, amount: Amount) -> Result<(), ExecutionError> {
592        if owner == &AccountOwner::CHAIN {
593            let new_balance = self.balance.get().saturating_add(amount);
594            self.balance.set(new_balance);
595        } else {
596            let balance = self.balances.get_mut_or_default(owner).await?;
597            *balance = balance.saturating_add(amount);
598        }
599        Ok(())
600    }
601
602    async fn credit_or_send_message(
603        &mut self,
604        source: AccountOwner,
605        recipient: Account,
606        amount: Amount,
607    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
608        let source_chain_id = self.context().extra().chain_id();
609        if recipient.chain_id == source_chain_id {
610            // Handle same-chain transfer locally.
611            let target = recipient.owner;
612            self.credit(&target, amount).await?;
613            Ok(None)
614        } else {
615            // Handle cross-chain transfer with message.
616            let message = SystemMessage::Credit {
617                amount,
618                source,
619                target: recipient.owner,
620            };
621            Ok(Some(
622                OutgoingMessage::new(recipient.chain_id, message).with_kind(MessageKind::Tracked),
623            ))
624        }
625    }
626
627    pub async fn transfer(
628        &mut self,
629        authenticated_signer: Option<AccountOwner>,
630        authenticated_application_id: Option<ApplicationId>,
631        source: AccountOwner,
632        recipient: Account,
633        amount: Amount,
634    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
635        if source == AccountOwner::CHAIN {
636            ensure!(
637                authenticated_signer.is_some()
638                    && self
639                        .ownership
640                        .get()
641                        .verify_owner(&authenticated_signer.unwrap()),
642                ExecutionError::UnauthenticatedTransferOwner
643            );
644        } else {
645            ensure!(
646                authenticated_signer == Some(source)
647                    || authenticated_application_id.map(AccountOwner::from) == Some(source),
648                ExecutionError::UnauthenticatedTransferOwner
649            );
650        }
651        ensure!(
652            amount > Amount::ZERO,
653            ExecutionError::IncorrectTransferAmount
654        );
655        self.debit(&source, amount).await?;
656        self.credit_or_send_message(source, recipient, amount).await
657    }
658
659    pub async fn claim(
660        &mut self,
661        authenticated_signer: Option<AccountOwner>,
662        authenticated_application_id: Option<ApplicationId>,
663        source: AccountOwner,
664        target_id: ChainId,
665        recipient: Account,
666        amount: Amount,
667    ) -> Result<Option<OutgoingMessage>, ExecutionError> {
668        ensure!(
669            authenticated_signer == Some(source)
670                || authenticated_application_id.map(AccountOwner::from) == Some(source),
671            ExecutionError::UnauthenticatedClaimOwner
672        );
673        ensure!(amount > Amount::ZERO, ExecutionError::IncorrectClaimAmount);
674
675        let current_chain_id = self.context().extra().chain_id();
676        if target_id == current_chain_id {
677            // Handle same-chain claim locally by processing the withdraw operation directly
678            self.debit(&source, amount).await?;
679            self.credit_or_send_message(source, recipient, amount).await
680        } else {
681            // Handle cross-chain claim with Withdraw message
682            let message = SystemMessage::Withdraw {
683                amount,
684                owner: source,
685                recipient,
686            };
687            Ok(Some(
688                OutgoingMessage::new(target_id, message)
689                    .with_authenticated_signer(authenticated_signer),
690            ))
691        }
692    }
693
694    /// Debits an [`Amount`] of tokens from an account's balance.
695    async fn debit(
696        &mut self,
697        account: &AccountOwner,
698        amount: Amount,
699    ) -> Result<(), ExecutionError> {
700        let balance = if account == &AccountOwner::CHAIN {
701            self.balance.get_mut()
702        } else {
703            self.balances.get_mut(account).await?.ok_or_else(|| {
704                ExecutionError::InsufficientBalance {
705                    balance: Amount::ZERO,
706                    account: *account,
707                }
708            })?
709        };
710
711        balance
712            .try_sub_assign(amount)
713            .map_err(|_| ExecutionError::InsufficientBalance {
714                balance: *balance,
715                account: *account,
716            })?;
717
718        if account != &AccountOwner::CHAIN && balance.is_zero() {
719            self.balances.remove(account)?;
720        }
721
722        Ok(())
723    }
724
725    /// Executes a cross-chain message that represents the recipient's side of an operation.
726    pub async fn execute_message(
727        &mut self,
728        context: MessageContext,
729        message: SystemMessage,
730    ) -> Result<Vec<OutgoingMessage>, ExecutionError> {
731        let mut outcome = Vec::new();
732        use SystemMessage::*;
733        match message {
734            Credit {
735                amount,
736                source,
737                target,
738            } => {
739                let receiver = if context.is_bouncing { source } else { target };
740                self.credit(&receiver, amount).await?;
741            }
742            Withdraw {
743                amount,
744                owner,
745                recipient,
746            } => {
747                self.debit(&owner, amount).await?;
748                if let Some(message) = self
749                    .credit_or_send_message(owner, recipient, amount)
750                    .await?
751                {
752                    outcome.push(message);
753                }
754            }
755        }
756        Ok(outcome)
757    }
758
759    /// Initializes the system application state on a newly opened chain.
760    /// Returns `Ok(true)` if the chain was already initialized, `Ok(false)` if it wasn't.
761    pub async fn initialize_chain(&mut self, chain_id: ChainId) -> Result<bool, ExecutionError> {
762        if self.description.get().is_some() {
763            // already initialized
764            return Ok(true);
765        }
766        let description_blob = self
767            .read_blob_content(BlobId::new(chain_id.0, BlobType::ChainDescription))
768            .await?;
769        let description: ChainDescription = bcs::from_bytes(description_blob.bytes())?;
770        let InitialChainConfig {
771            ownership,
772            epoch,
773            balance,
774            min_active_epoch,
775            max_active_epoch,
776            application_permissions,
777        } = description.config().clone();
778        self.timestamp.set(description.timestamp());
779        self.description.set(Some(description));
780        self.epoch.set(epoch);
781        let committees = self
782            .context()
783            .extra()
784            .committees_for(min_active_epoch..=max_active_epoch)
785            .await?;
786        self.committees.set(committees);
787        let admin_id = self
788            .context()
789            .extra()
790            .get_network_description()
791            .await?
792            .ok_or(ExecutionError::NoNetworkDescriptionFound)?
793            .admin_chain_id;
794        self.admin_id.set(Some(admin_id));
795        self.ownership.set(ownership);
796        self.balance.set(balance);
797        self.application_permissions.set(application_permissions);
798        Ok(false)
799    }
800
801    pub async fn handle_query(
802        &mut self,
803        context: QueryContext,
804        _query: SystemQuery,
805    ) -> Result<QueryOutcome<SystemResponse>, ExecutionError> {
806        let response = SystemResponse {
807            chain_id: context.chain_id,
808            balance: *self.balance.get(),
809        };
810        Ok(QueryOutcome {
811            response,
812            operations: vec![],
813        })
814    }
815
816    /// Returns the messages to open a new chain, and subtracts the new chain's balance
817    /// from this chain's.
818    pub async fn open_chain(
819        &mut self,
820        config: OpenChainConfig,
821        parent: ChainId,
822        block_height: BlockHeight,
823        timestamp: Timestamp,
824        txn_tracker: &mut TransactionTracker,
825    ) -> Result<ChainId, ExecutionError> {
826        let chain_index = txn_tracker.next_chain_index();
827        let chain_origin = ChainOrigin::Child {
828            parent,
829            block_height,
830            chain_index,
831        };
832        let init_chain_config = config.init_chain_config(
833            *self.epoch.get(),
834            self.committees
835                .get()
836                .keys()
837                .min()
838                .copied()
839                .unwrap_or(Epoch::ZERO),
840            self.committees
841                .get()
842                .keys()
843                .max()
844                .copied()
845                .unwrap_or(Epoch::ZERO),
846        );
847        let chain_description = ChainDescription::new(chain_origin, init_chain_config, timestamp);
848        let child_id = chain_description.id();
849        self.debit(&AccountOwner::CHAIN, config.balance).await?;
850        let blob = Blob::new_chain_description(&chain_description);
851        txn_tracker.add_created_blob(blob);
852        Ok(child_id)
853    }
854
855    pub async fn close_chain(&mut self) -> Result<(), ExecutionError> {
856        self.closed.set(true);
857        Ok(())
858    }
859
860    pub async fn create_application(
861        &mut self,
862        chain_id: ChainId,
863        block_height: BlockHeight,
864        module_id: ModuleId,
865        parameters: Vec<u8>,
866        required_application_ids: Vec<ApplicationId>,
867        txn_tracker: &mut TransactionTracker,
868    ) -> Result<CreateApplicationResult, ExecutionError> {
869        let application_index = txn_tracker.next_application_index();
870
871        let blob_ids = self.check_bytecode_blobs(&module_id, txn_tracker).await?;
872        // We only remember to register the blobs that aren't recorded in `used_blobs`
873        // already.
874        for blob_id in blob_ids {
875            self.blob_used(txn_tracker, blob_id).await?;
876        }
877
878        let application_description = ApplicationDescription {
879            module_id,
880            creator_chain_id: chain_id,
881            block_height,
882            application_index,
883            parameters,
884            required_application_ids,
885        };
886        self.check_required_applications(&application_description, txn_tracker)
887            .await?;
888
889        let blob = Blob::new_application_description(&application_description);
890        self.used_blobs.insert(&blob.id())?;
891        txn_tracker.add_created_blob(blob);
892
893        Ok(CreateApplicationResult {
894            app_id: ApplicationId::from(&application_description),
895        })
896    }
897
898    async fn check_required_applications(
899        &mut self,
900        application_description: &ApplicationDescription,
901        txn_tracker: &mut TransactionTracker,
902    ) -> Result<(), ExecutionError> {
903        // Make sure that referenced applications IDs have been registered.
904        for required_id in &application_description.required_application_ids {
905            Box::pin(self.describe_application(*required_id, txn_tracker)).await?;
906        }
907        Ok(())
908    }
909
910    /// Retrieves an application's description.
911    pub async fn describe_application(
912        &mut self,
913        id: ApplicationId,
914        txn_tracker: &mut TransactionTracker,
915    ) -> Result<ApplicationDescription, ExecutionError> {
916        let blob_id = id.description_blob_id();
917        let content = match txn_tracker.created_blobs().get(&blob_id) {
918            Some(content) => content.clone(),
919            None => self.read_blob_content(blob_id).await?,
920        };
921        self.blob_used(txn_tracker, blob_id).await?;
922        let description: ApplicationDescription = bcs::from_bytes(content.bytes())?;
923
924        let blob_ids = self
925            .check_bytecode_blobs(&description.module_id, txn_tracker)
926            .await?;
927        // We only remember to register the blobs that aren't recorded in `used_blobs`
928        // already.
929        for blob_id in blob_ids {
930            self.blob_used(txn_tracker, blob_id).await?;
931        }
932
933        self.check_required_applications(&description, txn_tracker)
934            .await?;
935
936        Ok(description)
937    }
938
939    /// Retrieves the recursive dependencies of applications and applies a topological sort.
940    pub async fn find_dependencies(
941        &mut self,
942        mut stack: Vec<ApplicationId>,
943        txn_tracker: &mut TransactionTracker,
944    ) -> Result<Vec<ApplicationId>, ExecutionError> {
945        // What we return at the end.
946        let mut result = Vec::new();
947        // The entries already inserted in `result`.
948        let mut sorted = HashSet::new();
949        // The entries for which dependencies have already been pushed once to the stack.
950        let mut seen = HashSet::new();
951
952        while let Some(id) = stack.pop() {
953            if sorted.contains(&id) {
954                continue;
955            }
956            if seen.contains(&id) {
957                // Second time we see this entry. It was last pushed just before its
958                // dependencies -- which are now fully sorted.
959                sorted.insert(id);
960                result.push(id);
961                continue;
962            }
963            // First time we see this entry:
964            // 1. Mark it so that its dependencies are no longer pushed to the stack.
965            seen.insert(id);
966            // 2. Schedule all the (yet unseen) dependencies, then this entry for a second visit.
967            stack.push(id);
968            let app = self.describe_application(id, txn_tracker).await?;
969            for child in app.required_application_ids.iter().rev() {
970                if !seen.contains(child) {
971                    stack.push(*child);
972                }
973            }
974        }
975        Ok(result)
976    }
977
978    /// Records a blob that is used in this block. If this is the first use on this chain, creates
979    /// an oracle response for it.
980    pub(crate) async fn blob_used(
981        &mut self,
982        txn_tracker: &mut TransactionTracker,
983        blob_id: BlobId,
984    ) -> Result<bool, ExecutionError> {
985        if self.used_blobs.contains(&blob_id).await? {
986            return Ok(false); // Nothing to do.
987        }
988        self.used_blobs.insert(&blob_id)?;
989        txn_tracker.replay_oracle_response(OracleResponse::Blob(blob_id))?;
990        Ok(true)
991    }
992
993    /// Records a blob that is published in this block. This does not create an oracle entry, and
994    /// the blob can be used without using an oracle in the future on this chain.
995    fn blob_published(
996        &mut self,
997        blob_id: &BlobId,
998        txn_tracker: &mut TransactionTracker,
999    ) -> Result<(), ExecutionError> {
1000        self.used_blobs.insert(blob_id)?;
1001        txn_tracker.add_published_blob(*blob_id);
1002        Ok(())
1003    }
1004
1005    pub async fn read_blob_content(&self, blob_id: BlobId) -> Result<BlobContent, ExecutionError> {
1006        match self.context().extra().get_blob(blob_id).await {
1007            Ok(Some(blob)) => Ok(blob.into()),
1008            Ok(None) => Err(ExecutionError::BlobsNotFound(vec![blob_id])),
1009            Err(error) => Err(error.into()),
1010        }
1011    }
1012
1013    pub async fn assert_blob_exists(&mut self, blob_id: BlobId) -> Result<(), ExecutionError> {
1014        if self.context().extra().contains_blob(blob_id).await? {
1015            Ok(())
1016        } else {
1017            Err(ExecutionError::BlobsNotFound(vec![blob_id]))
1018        }
1019    }
1020
1021    async fn check_bytecode_blobs(
1022        &mut self,
1023        module_id: &ModuleId,
1024        txn_tracker: &TransactionTracker,
1025    ) -> Result<Vec<BlobId>, ExecutionError> {
1026        let blob_ids = module_id.bytecode_blob_ids();
1027
1028        let mut missing_blobs = Vec::new();
1029        for blob_id in &blob_ids {
1030            // First check if blob is present in created_blobs
1031            if txn_tracker.created_blobs().contains_key(blob_id) {
1032                continue; // Blob found in created_blobs, it's ok
1033            }
1034            // If not in created_blobs, check storage
1035            if !self.context().extra().contains_blob(*blob_id).await? {
1036                missing_blobs.push(*blob_id);
1037            }
1038        }
1039        ensure!(
1040            missing_blobs.is_empty(),
1041            ExecutionError::BlobsNotFound(missing_blobs)
1042        );
1043
1044        Ok(blob_ids)
1045    }
1046}