linera_execution/
execution.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet, HashMap},
6    mem, vec,
7};
8
9use futures::{stream::FuturesOrdered, FutureExt, StreamExt, TryStreamExt};
10use linera_base::{
11    data_types::{Amount, BlockHeight, Timestamp},
12    identifiers::{Account, ChainId, Destination, Owner},
13};
14use linera_views::{
15    context::Context,
16    key_value_store_view::KeyValueStoreView,
17    reentrant_collection_view::HashedReentrantCollectionView,
18    views::{ClonableView, View},
19};
20use linera_views_derive::CryptoHashView;
21#[cfg(with_testing)]
22use {
23    crate::{
24        ResourceControlPolicy, ResourceTracker, TestExecutionRuntimeContext, UserContractCode,
25    },
26    linera_base::data_types::Blob,
27    linera_views::context::MemoryContext,
28    std::sync::Arc,
29};
30
31use super::{runtime::ServiceRuntimeRequest, ExecutionRequest};
32use crate::{
33    resources::ResourceController, system::SystemExecutionStateView, ContractSyncRuntime,
34    ExecutionError, ExecutionOutcome, ExecutionRuntimeConfig, ExecutionRuntimeContext, Message,
35    MessageContext, MessageKind, Operation, OperationContext, Query, QueryContext,
36    RawExecutionOutcome, RawOutgoingMessage, Response, ServiceSyncRuntime, SystemMessage,
37    TransactionTracker, UserApplicationDescription, UserApplicationId,
38};
39
40/// A view accessing the execution state of a chain.
41#[derive(Debug, ClonableView, CryptoHashView)]
42pub struct ExecutionStateView<C> {
43    /// System application.
44    pub system: SystemExecutionStateView<C>,
45    /// User applications.
46    pub users: HashedReentrantCollectionView<C, UserApplicationId, KeyValueStoreView<C>>,
47}
48
49/// How to interact with a long-lived service runtime.
50pub struct ServiceRuntimeEndpoint {
51    /// How to receive requests.
52    pub incoming_execution_requests: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
53    /// How to query the runtime.
54    pub runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
55}
56
57#[cfg(with_testing)]
58impl ExecutionStateView<MemoryContext<TestExecutionRuntimeContext>>
59where
60    MemoryContext<TestExecutionRuntimeContext>: Context + Clone + Send + Sync + 'static,
61{
62    /// Simulates the instantiation of an application.
63    pub async fn simulate_instantiation(
64        &mut self,
65        contract: UserContractCode,
66        local_time: Timestamp,
67        application_description: UserApplicationDescription,
68        instantiation_argument: Vec<u8>,
69        contract_blob: Blob,
70        service_blob: Blob,
71    ) -> Result<(), ExecutionError> {
72        let chain_id = application_description.creation.chain_id;
73        let context = OperationContext {
74            chain_id,
75            authenticated_signer: None,
76            authenticated_caller_id: None,
77            height: application_description.creation.height,
78            index: Some(0),
79        };
80
81        let action = UserAction::Instantiate(context, instantiation_argument);
82        let next_message_index = application_description.creation.index + 1;
83
84        let application_id = self
85            .system
86            .registry
87            .register_application(application_description)
88            .await?;
89
90        self.context()
91            .extra()
92            .user_contracts()
93            .insert(application_id, contract);
94
95        self.context()
96            .extra()
97            .add_blobs(vec![contract_blob, service_blob]);
98
99        let tracker = ResourceTracker::default();
100        let policy = ResourceControlPolicy::default();
101        let mut resource_controller = ResourceController {
102            policy: Arc::new(policy),
103            tracker,
104            account: None,
105        };
106        let mut txn_tracker = TransactionTracker::new(next_message_index, None);
107        self.run_user_action(
108            application_id,
109            chain_id,
110            local_time,
111            action,
112            context.refund_grant_to(),
113            None,
114            &mut txn_tracker,
115            &mut resource_controller,
116        )
117        .await?;
118        self.update_execution_outcomes_with_app_registrations(&mut txn_tracker)
119            .await?;
120        Ok(())
121    }
122}
123
124pub enum UserAction {
125    Instantiate(OperationContext, Vec<u8>),
126    Operation(OperationContext, Vec<u8>),
127    Message(MessageContext, Vec<u8>),
128}
129
130impl UserAction {
131    pub(crate) fn signer(&self) -> Option<Owner> {
132        use UserAction::*;
133        match self {
134            Instantiate(context, _) => context.authenticated_signer,
135            Operation(context, _) => context.authenticated_signer,
136            Message(context, _) => context.authenticated_signer,
137        }
138    }
139
140    pub(crate) fn height(&self) -> BlockHeight {
141        match self {
142            UserAction::Instantiate(context, _) => context.height,
143            UserAction::Operation(context, _) => context.height,
144            UserAction::Message(context, _) => context.height,
145        }
146    }
147}
148
149impl<C> ExecutionStateView<C>
150where
151    C: Context + Clone + Send + Sync + 'static,
152    C::Extra: ExecutionRuntimeContext,
153{
154    #[expect(clippy::too_many_arguments)]
155    async fn run_user_action(
156        &mut self,
157        application_id: UserApplicationId,
158        chain_id: ChainId,
159        local_time: Timestamp,
160        action: UserAction,
161        refund_grant_to: Option<Account>,
162        grant: Option<&mut Amount>,
163        txn_tracker: &mut TransactionTracker,
164        resource_controller: &mut ResourceController<Option<Owner>>,
165    ) -> Result<(), ExecutionError> {
166        let ExecutionRuntimeConfig {} = self.context().extra().execution_runtime_config();
167        self.run_user_action_with_runtime(
168            application_id,
169            chain_id,
170            local_time,
171            action,
172            refund_grant_to,
173            grant,
174            txn_tracker,
175            resource_controller,
176        )
177        .await?;
178        Ok(())
179    }
180
181    #[expect(clippy::too_many_arguments)]
182    async fn run_user_action_with_runtime(
183        &mut self,
184        application_id: UserApplicationId,
185        chain_id: ChainId,
186        local_time: Timestamp,
187        action: UserAction,
188        refund_grant_to: Option<Account>,
189        grant: Option<&mut Amount>,
190        txn_tracker: &mut TransactionTracker,
191        resource_controller: &mut ResourceController<Option<Owner>>,
192    ) -> Result<(), ExecutionError> {
193        let mut cloned_grant = grant.as_ref().map(|x| **x);
194        let initial_balance = resource_controller
195            .with_state_and_grant(self, cloned_grant.as_mut())
196            .await?
197            .balance()?;
198        let controller = ResourceController {
199            policy: resource_controller.policy.clone(),
200            tracker: resource_controller.tracker,
201            account: initial_balance,
202        };
203        let (execution_state_sender, mut execution_state_receiver) =
204            futures::channel::mpsc::unbounded();
205        let txn_tracker_moved = mem::take(txn_tracker);
206        let execution_outcomes_future = linera_base::task::spawn_blocking(move || {
207            ContractSyncRuntime::run_action(
208                execution_state_sender,
209                application_id,
210                chain_id,
211                local_time,
212                refund_grant_to,
213                controller,
214                action,
215                txn_tracker_moved,
216            )
217        });
218        while let Some(request) = execution_state_receiver.next().await {
219            self.handle_request(request).await?;
220        }
221
222        let (controller, txn_tracker_moved) = execution_outcomes_future.await??;
223        *txn_tracker = txn_tracker_moved;
224        resource_controller
225            .with_state_and_grant(self, grant)
226            .await?
227            .merge_balance(initial_balance, controller.balance()?)?;
228        resource_controller.tracker = controller.tracker;
229        Ok(())
230    }
231
232    /// Schedules application registration messages when needed.
233    ///
234    /// Ensures that the outgoing messages in `results` are preceded by a system message that
235    /// registers the application that will handle the messages.
236    pub async fn update_execution_outcomes_with_app_registrations(
237        &self,
238        txn_tracker: &mut TransactionTracker,
239    ) -> Result<(), ExecutionError> {
240        let results = txn_tracker.outcomes_mut();
241        let user_application_outcomes = results.iter().filter_map(|outcome| match outcome {
242            ExecutionOutcome::User(application_id, result) => Some((application_id, result)),
243            _ => None,
244        });
245
246        let mut applications_to_register_per_destination = BTreeMap::<_, BTreeSet<_>>::new();
247
248        for (application_id, result) in user_application_outcomes {
249            for message in &result.messages {
250                applications_to_register_per_destination
251                    .entry(&message.destination)
252                    .or_default()
253                    .insert(*application_id);
254            }
255        }
256
257        if applications_to_register_per_destination.is_empty() {
258            return Ok(());
259        }
260
261        let messages = applications_to_register_per_destination
262            .into_iter()
263            .map(|(destination, applications_to_describe)| async {
264                let applications = self
265                    .system
266                    .registry
267                    .describe_applications_with_dependencies(
268                        applications_to_describe.into_iter().collect(),
269                        &HashMap::new(),
270                    )
271                    .await?;
272
273                Ok::<_, ExecutionError>(RawOutgoingMessage {
274                    destination: destination.clone(),
275                    authenticated: false,
276                    grant: Amount::ZERO,
277                    kind: MessageKind::Simple,
278                    message: SystemMessage::RegisterApplications { applications },
279                })
280            })
281            .collect::<FuturesOrdered<_>>()
282            .try_collect::<Vec<_>>()
283            .await?;
284
285        let system_outcome = RawExecutionOutcome {
286            messages,
287            ..RawExecutionOutcome::default()
288        };
289
290        // Insert the message before the first user outcome.
291        let index = results
292            .iter()
293            .position(|outcome| matches!(outcome, ExecutionOutcome::User(_, _)))
294            .unwrap_or(results.len());
295        // TODO(#2362): This inserts messages in front of existing ones, invalidating their IDs.
296        results.insert(index, ExecutionOutcome::System(system_outcome));
297
298        Ok(())
299    }
300
301    pub async fn execute_operation(
302        &mut self,
303        context: OperationContext,
304        local_time: Timestamp,
305        operation: Operation,
306        txn_tracker: &mut TransactionTracker,
307        resource_controller: &mut ResourceController<Option<Owner>>,
308    ) -> Result<(), ExecutionError> {
309        assert_eq!(context.chain_id, self.context().extra().chain_id());
310        match operation {
311            Operation::System(op) => {
312                let new_application = self
313                    .system
314                    .execute_operation(context, op, txn_tracker)
315                    .await?;
316                if let Some((application_id, argument)) = new_application {
317                    let user_action = UserAction::Instantiate(context, argument);
318                    self.run_user_action(
319                        application_id,
320                        context.chain_id,
321                        local_time,
322                        user_action,
323                        context.refund_grant_to(),
324                        None,
325                        txn_tracker,
326                        resource_controller,
327                    )
328                    .await?;
329                }
330            }
331            Operation::User {
332                application_id,
333                bytes,
334            } => {
335                self.run_user_action(
336                    application_id,
337                    context.chain_id,
338                    local_time,
339                    UserAction::Operation(context, bytes),
340                    context.refund_grant_to(),
341                    None,
342                    txn_tracker,
343                    resource_controller,
344                )
345                .await?;
346            }
347        }
348        Ok(())
349    }
350
351    pub async fn execute_message(
352        &mut self,
353        context: MessageContext,
354        local_time: Timestamp,
355        message: Message,
356        grant: Option<&mut Amount>,
357        txn_tracker: &mut TransactionTracker,
358        resource_controller: &mut ResourceController<Option<Owner>>,
359    ) -> Result<(), ExecutionError> {
360        assert_eq!(context.chain_id, self.context().extra().chain_id());
361        match message {
362            Message::System(message) => {
363                let outcome = self
364                    .system
365                    .execute_message(context, message, txn_tracker)
366                    .await?;
367                txn_tracker.add_system_outcome(outcome)?;
368            }
369            Message::User {
370                application_id,
371                bytes,
372            } => {
373                self.run_user_action(
374                    application_id,
375                    context.chain_id,
376                    local_time,
377                    UserAction::Message(context, bytes),
378                    context.refund_grant_to,
379                    grant,
380                    txn_tracker,
381                    resource_controller,
382                )
383                .await?;
384            }
385        }
386        Ok(())
387    }
388
389    pub async fn bounce_message(
390        &self,
391        context: MessageContext,
392        grant: Amount,
393        message: Message,
394        txn_tracker: &mut TransactionTracker,
395    ) -> Result<(), ExecutionError> {
396        assert_eq!(context.chain_id, self.context().extra().chain_id());
397        match message {
398            Message::System(message) => {
399                let mut outcome = RawExecutionOutcome {
400                    authenticated_signer: context.authenticated_signer,
401                    refund_grant_to: context.refund_grant_to,
402                    ..Default::default()
403                };
404                outcome.messages.push(RawOutgoingMessage {
405                    destination: Destination::Recipient(context.message_id.chain_id),
406                    authenticated: true,
407                    grant,
408                    kind: MessageKind::Bouncing,
409                    message,
410                });
411                txn_tracker.add_system_outcome(outcome)?;
412            }
413            Message::User {
414                application_id,
415                bytes,
416            } => {
417                let mut outcome = RawExecutionOutcome {
418                    authenticated_signer: context.authenticated_signer,
419                    refund_grant_to: context.refund_grant_to,
420                    ..Default::default()
421                };
422                outcome.messages.push(RawOutgoingMessage {
423                    destination: Destination::Recipient(context.message_id.chain_id),
424                    authenticated: true,
425                    grant,
426                    kind: MessageKind::Bouncing,
427                    message: bytes,
428                });
429                txn_tracker.add_user_outcome(application_id, outcome)?;
430            }
431        }
432        Ok(())
433    }
434
435    pub async fn send_refund(
436        &self,
437        context: MessageContext,
438        amount: Amount,
439        account: Account,
440        txn_tracker: &mut TransactionTracker,
441    ) -> Result<(), ExecutionError> {
442        assert_eq!(context.chain_id, self.context().extra().chain_id());
443        let mut outcome = RawExecutionOutcome::default();
444        let message = RawOutgoingMessage {
445            destination: Destination::Recipient(account.chain_id),
446            authenticated: false,
447            grant: Amount::ZERO,
448            kind: MessageKind::Tracked,
449            message: SystemMessage::Credit {
450                amount,
451                source: context.authenticated_signer,
452                target: account.owner,
453            },
454        };
455        outcome.messages.push(message);
456        txn_tracker.add_system_outcome(outcome)?;
457        Ok(())
458    }
459
460    pub async fn query_application(
461        &mut self,
462        context: QueryContext,
463        query: Query,
464        endpoint: Option<&mut ServiceRuntimeEndpoint>,
465    ) -> Result<Response, ExecutionError> {
466        assert_eq!(context.chain_id, self.context().extra().chain_id());
467        match query {
468            Query::System(query) => {
469                let response = self.system.handle_query(context, query).await?;
470                Ok(Response::System(response))
471            }
472            Query::User {
473                application_id,
474                bytes,
475            } => {
476                let ExecutionRuntimeConfig {} = self.context().extra().execution_runtime_config();
477                let response = match endpoint {
478                    Some(endpoint) => {
479                        self.query_user_application_with_long_lived_service(
480                            application_id,
481                            context,
482                            bytes,
483                            &mut endpoint.incoming_execution_requests,
484                            &mut endpoint.runtime_request_sender,
485                        )
486                        .await?
487                    }
488                    None => {
489                        self.query_user_application(application_id, context, bytes)
490                            .await?
491                    }
492                };
493                Ok(Response::User(response))
494            }
495        }
496    }
497
498    async fn query_user_application(
499        &mut self,
500        application_id: UserApplicationId,
501        context: QueryContext,
502        query: Vec<u8>,
503    ) -> Result<Vec<u8>, ExecutionError> {
504        let (execution_state_sender, mut execution_state_receiver) =
505            futures::channel::mpsc::unbounded();
506        let execution_outcomes_future = linera_base::task::spawn_blocking(move || {
507            let mut runtime = ServiceSyncRuntime::new(execution_state_sender, context);
508            runtime.run_query(application_id, query)
509        });
510        while let Some(request) = execution_state_receiver.next().await {
511            self.handle_request(request).await?;
512        }
513
514        let response = execution_outcomes_future.await??;
515        Ok(response)
516    }
517
518    async fn query_user_application_with_long_lived_service(
519        &mut self,
520        application_id: UserApplicationId,
521        context: QueryContext,
522        query: Vec<u8>,
523        incoming_execution_requests: &mut futures::channel::mpsc::UnboundedReceiver<
524            ExecutionRequest,
525        >,
526        runtime_request_sender: &mut std::sync::mpsc::Sender<ServiceRuntimeRequest>,
527    ) -> Result<Vec<u8>, ExecutionError> {
528        let (response_sender, response_receiver) = oneshot::channel();
529        let mut response_receiver = response_receiver.fuse();
530
531        runtime_request_sender
532            .send(ServiceRuntimeRequest::Query {
533                application_id,
534                context,
535                query,
536                callback: response_sender,
537            })
538            .expect("Service runtime thread should only stop when `request_sender` is dropped");
539
540        loop {
541            futures::select! {
542                maybe_request = incoming_execution_requests.next() => {
543                    if let Some(request) = maybe_request {
544                        self.handle_request(request).await?;
545                    }
546                }
547                response = &mut response_receiver => {
548                    return response.map_err(|_| ExecutionError::MissingRuntimeResponse)?;
549                }
550            }
551        }
552    }
553
554    pub async fn list_applications(
555        &self,
556    ) -> Result<Vec<(UserApplicationId, UserApplicationDescription)>, ExecutionError> {
557        let mut applications = vec![];
558        for index in self.system.registry.known_applications.indices().await? {
559            let application_description =
560                self.system.registry.known_applications.get(&index).await?;
561
562            if let Some(application_description) = application_description {
563                applications.push((index, application_description));
564            }
565        }
566        Ok(applications)
567    }
568}