linera_execution/
execution.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{mem, vec};
5
6use futures::{FutureExt, StreamExt};
7use linera_base::{
8    data_types::{Amount, BlockHeight},
9    identifiers::{Account, AccountOwner, Destination, StreamId},
10};
11use linera_views::{
12    context::Context,
13    key_value_store_view::KeyValueStoreView,
14    map_view::MapView,
15    reentrant_collection_view::HashedReentrantCollectionView,
16    views::{ClonableView, View},
17};
18use linera_views_derive::CryptoHashView;
19#[cfg(with_testing)]
20use {
21    crate::{
22        ResourceControlPolicy, ResourceTracker, TestExecutionRuntimeContext, UserContractCode,
23    },
24    linera_base::data_types::Blob,
25    linera_views::context::MemoryContext,
26    std::sync::Arc,
27};
28
29use super::{runtime::ServiceRuntimeRequest, ExecutionRequest};
30use crate::{
31    resources::ResourceController, system::SystemExecutionStateView, ApplicationDescription,
32    ApplicationId, ContractSyncRuntime, ExecutionError, ExecutionRuntimeConfig,
33    ExecutionRuntimeContext, Message, MessageContext, MessageKind, Operation, OperationContext,
34    OutgoingMessage, Query, QueryContext, QueryOutcome, ServiceSyncRuntime, SystemMessage,
35    TransactionTracker,
36};
37
38/// A view accessing the execution state of a chain.
39#[derive(Debug, ClonableView, CryptoHashView)]
40pub struct ExecutionStateView<C> {
41    /// System application.
42    pub system: SystemExecutionStateView<C>,
43    /// User applications.
44    pub users: HashedReentrantCollectionView<C, ApplicationId, KeyValueStoreView<C>>,
45    /// The number of events in the streams that this chain is writing to.
46    pub stream_event_counts: MapView<C, StreamId, u32>,
47}
48
49/// How to interact with a long-lived service runtime.
50pub struct ServiceRuntimeEndpoint {
51    /// How to receive requests.
52    pub incoming_execution_requests: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
53    /// How to query the runtime.
54    pub runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
55}
56
57#[cfg(with_testing)]
58impl ExecutionStateView<MemoryContext<TestExecutionRuntimeContext>>
59where
60    MemoryContext<TestExecutionRuntimeContext>: Context + Clone + Send + Sync + 'static,
61{
62    /// Simulates the instantiation of an application.
63    pub async fn simulate_instantiation(
64        &mut self,
65        contract: UserContractCode,
66        local_time: linera_base::data_types::Timestamp,
67        application_description: ApplicationDescription,
68        instantiation_argument: Vec<u8>,
69        contract_blob: Blob,
70        service_blob: Blob,
71    ) -> Result<(), ExecutionError> {
72        let chain_id = application_description.creator_chain_id;
73        assert_eq!(chain_id, self.context().extra().chain_id);
74        let context = OperationContext {
75            chain_id,
76            authenticated_signer: None,
77            authenticated_caller_id: None,
78            height: application_description.block_height,
79            round: None,
80            index: Some(0),
81        };
82
83        let action = UserAction::Instantiate(context, instantiation_argument);
84        let next_message_index = 0;
85        let next_application_index = application_description.application_index + 1;
86
87        let application_id = From::from(&application_description);
88
89        self.system.used_blobs.insert(&contract_blob.id())?;
90        self.system.used_blobs.insert(&service_blob.id())?;
91
92        self.context()
93            .extra()
94            .user_contracts()
95            .insert(application_id, contract);
96
97        self.context()
98            .extra()
99            .add_blobs([
100                contract_blob,
101                service_blob,
102                Blob::new_application_description(&application_description),
103            ])
104            .await?;
105
106        let tracker = ResourceTracker::default();
107        let policy = ResourceControlPolicy::no_fees();
108        let mut resource_controller = ResourceController {
109            policy: Arc::new(policy),
110            tracker,
111            account: None,
112        };
113        let mut txn_tracker = TransactionTracker::new(
114            local_time,
115            0,
116            next_message_index,
117            next_application_index,
118            None,
119        );
120        txn_tracker.add_created_blob(Blob::new_application_description(&application_description));
121        self.run_user_action(
122            application_id,
123            action,
124            context.refund_grant_to(),
125            None,
126            &mut txn_tracker,
127            &mut resource_controller,
128        )
129        .await?;
130
131        Ok(())
132    }
133}
134
135pub enum UserAction {
136    Instantiate(OperationContext, Vec<u8>),
137    Operation(OperationContext, Vec<u8>),
138    Message(MessageContext, Vec<u8>),
139}
140
141impl UserAction {
142    pub(crate) fn signer(&self) -> Option<AccountOwner> {
143        use UserAction::*;
144        match self {
145            Instantiate(context, _) => context.authenticated_signer,
146            Operation(context, _) => context.authenticated_signer,
147            Message(context, _) => context.authenticated_signer,
148        }
149    }
150
151    pub(crate) fn height(&self) -> BlockHeight {
152        match self {
153            UserAction::Instantiate(context, _) => context.height,
154            UserAction::Operation(context, _) => context.height,
155            UserAction::Message(context, _) => context.height,
156        }
157    }
158
159    pub(crate) fn round(&self) -> Option<u32> {
160        match self {
161            UserAction::Instantiate(context, _) => context.round,
162            UserAction::Operation(context, _) => context.round,
163            UserAction::Message(context, _) => context.round,
164        }
165    }
166}
167
168impl<C> ExecutionStateView<C>
169where
170    C: Context + Clone + Send + Sync + 'static,
171    C::Extra: ExecutionRuntimeContext,
172{
173    async fn run_user_action(
174        &mut self,
175        application_id: ApplicationId,
176        action: UserAction,
177        refund_grant_to: Option<Account>,
178        grant: Option<&mut Amount>,
179        txn_tracker: &mut TransactionTracker,
180        resource_controller: &mut ResourceController<Option<AccountOwner>>,
181    ) -> Result<(), ExecutionError> {
182        let ExecutionRuntimeConfig {} = self.context().extra().execution_runtime_config();
183        self.run_user_action_with_runtime(
184            application_id,
185            action,
186            refund_grant_to,
187            grant,
188            txn_tracker,
189            resource_controller,
190        )
191        .await
192    }
193
194    async fn run_user_action_with_runtime(
195        &mut self,
196        application_id: ApplicationId,
197        action: UserAction,
198        refund_grant_to: Option<Account>,
199        grant: Option<&mut Amount>,
200        txn_tracker: &mut TransactionTracker,
201        resource_controller: &mut ResourceController<Option<AccountOwner>>,
202    ) -> Result<(), ExecutionError> {
203        let chain_id = self.context().extra().chain_id();
204        let mut cloned_grant = grant.as_ref().map(|x| **x);
205        let initial_balance = resource_controller
206            .with_state_and_grant(&mut self.system, cloned_grant.as_mut())
207            .await?
208            .balance()?;
209        let controller = ResourceController {
210            policy: resource_controller.policy.clone(),
211            tracker: resource_controller.tracker,
212            account: initial_balance,
213        };
214        let (execution_state_sender, mut execution_state_receiver) =
215            futures::channel::mpsc::unbounded();
216        let (code, description) = self.load_contract(application_id, txn_tracker).await?;
217        let txn_tracker_moved = mem::take(txn_tracker);
218        let contract_runtime_task = linera_base::task::Blocking::spawn(move |mut codes| {
219            let runtime = ContractSyncRuntime::new(
220                execution_state_sender,
221                chain_id,
222                refund_grant_to,
223                controller,
224                &action,
225                txn_tracker_moved,
226            );
227
228            async move {
229                let code = codes.next().await.expect("we send this immediately below");
230                runtime.preload_contract(application_id, code, description)?;
231                runtime.run_action(application_id, chain_id, action)
232            }
233        })
234        .await;
235
236        contract_runtime_task.send(code)?;
237
238        while let Some(request) = execution_state_receiver.next().await {
239            self.handle_request(request, resource_controller).await?;
240        }
241
242        let (result, controller, txn_tracker_moved) = contract_runtime_task.join().await?;
243
244        *txn_tracker = txn_tracker_moved;
245        txn_tracker.add_operation_result(result);
246
247        resource_controller
248            .with_state_and_grant(&mut self.system, grant)
249            .await?
250            .merge_balance(initial_balance, controller.balance()?)?;
251        resource_controller.tracker = controller.tracker;
252
253        Ok(())
254    }
255
256    pub async fn execute_operation(
257        &mut self,
258        context: OperationContext,
259        operation: Operation,
260        txn_tracker: &mut TransactionTracker,
261        resource_controller: &mut ResourceController<Option<AccountOwner>>,
262    ) -> Result<(), ExecutionError> {
263        assert_eq!(context.chain_id, self.context().extra().chain_id());
264        match operation {
265            Operation::System(op) => {
266                let new_application = self
267                    .system
268                    .execute_operation(context, *op, txn_tracker, resource_controller)
269                    .await?;
270                if let Some((application_id, argument)) = new_application {
271                    let user_action = UserAction::Instantiate(context, argument);
272                    self.run_user_action(
273                        application_id,
274                        user_action,
275                        context.refund_grant_to(),
276                        None,
277                        txn_tracker,
278                        resource_controller,
279                    )
280                    .await?;
281                }
282            }
283            Operation::User {
284                application_id,
285                bytes,
286            } => {
287                self.run_user_action(
288                    application_id,
289                    UserAction::Operation(context, bytes),
290                    context.refund_grant_to(),
291                    None,
292                    txn_tracker,
293                    resource_controller,
294                )
295                .await?;
296            }
297        }
298        Ok(())
299    }
300
301    pub async fn execute_message(
302        &mut self,
303        context: MessageContext,
304        message: Message,
305        grant: Option<&mut Amount>,
306        txn_tracker: &mut TransactionTracker,
307        resource_controller: &mut ResourceController<Option<AccountOwner>>,
308    ) -> Result<(), ExecutionError> {
309        assert_eq!(context.chain_id, self.context().extra().chain_id());
310        match message {
311            Message::System(message) => {
312                let outcome = self.system.execute_message(context, message).await?;
313                txn_tracker.add_outgoing_messages(outcome)?;
314            }
315            Message::User {
316                application_id,
317                bytes,
318            } => {
319                self.run_user_action(
320                    application_id,
321                    UserAction::Message(context, bytes),
322                    context.refund_grant_to,
323                    grant,
324                    txn_tracker,
325                    resource_controller,
326                )
327                .await?;
328            }
329        }
330        Ok(())
331    }
332
333    pub async fn bounce_message(
334        &self,
335        context: MessageContext,
336        grant: Amount,
337        message: Message,
338        txn_tracker: &mut TransactionTracker,
339    ) -> Result<(), ExecutionError> {
340        assert_eq!(context.chain_id, self.context().extra().chain_id());
341        txn_tracker.add_outgoing_message(OutgoingMessage {
342            destination: Destination::Recipient(context.message_id.chain_id),
343            authenticated_signer: context.authenticated_signer,
344            refund_grant_to: context.refund_grant_to.filter(|_| !grant.is_zero()),
345            grant,
346            kind: MessageKind::Bouncing,
347            message,
348        })?;
349        Ok(())
350    }
351
352    pub async fn send_refund(
353        &self,
354        context: MessageContext,
355        amount: Amount,
356        txn_tracker: &mut TransactionTracker,
357    ) -> Result<(), ExecutionError> {
358        assert_eq!(context.chain_id, self.context().extra().chain_id());
359        if amount.is_zero() {
360            return Ok(());
361        }
362        let Some(account) = context.refund_grant_to else {
363            return Err(ExecutionError::InternalError(
364                "Messages with grants should have a non-empty `refund_grant_to`",
365            ));
366        };
367        let message = SystemMessage::Credit {
368            amount,
369            source: context.authenticated_signer.unwrap_or(AccountOwner::CHAIN),
370            target: account.owner,
371        };
372        txn_tracker.add_outgoing_message(
373            OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
374        )?;
375        Ok(())
376    }
377
378    pub async fn query_application(
379        &mut self,
380        context: QueryContext,
381        query: Query,
382        endpoint: Option<&mut ServiceRuntimeEndpoint>,
383    ) -> Result<QueryOutcome, ExecutionError> {
384        assert_eq!(context.chain_id, self.context().extra().chain_id());
385        match query {
386            Query::System(query) => {
387                let outcome = self.system.handle_query(context, query).await?;
388                Ok(outcome.into())
389            }
390            Query::User {
391                application_id,
392                bytes,
393            } => {
394                let ExecutionRuntimeConfig {} = self.context().extra().execution_runtime_config();
395                let outcome = match endpoint {
396                    Some(endpoint) => {
397                        self.query_user_application_with_long_lived_service(
398                            application_id,
399                            context,
400                            bytes,
401                            &mut endpoint.incoming_execution_requests,
402                            &mut endpoint.runtime_request_sender,
403                        )
404                        .await?
405                    }
406                    None => {
407                        self.query_user_application(application_id, context, bytes)
408                            .await?
409                    }
410                };
411                Ok(outcome.into())
412            }
413        }
414    }
415
416    async fn query_user_application(
417        &mut self,
418        application_id: ApplicationId,
419        context: QueryContext,
420        query: Vec<u8>,
421    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
422        let (execution_state_sender, mut execution_state_receiver) =
423            futures::channel::mpsc::unbounded();
424        let (code, description) = self.load_service(application_id, None).await?;
425
426        let service_runtime_task = linera_base::task::Blocking::spawn(move |mut codes| {
427            let mut runtime = ServiceSyncRuntime::new(execution_state_sender, context);
428
429            async move {
430                let code = codes.next().await.expect("we send this immediately below");
431                runtime.preload_service(application_id, code, description)?;
432                runtime.run_query(application_id, query)
433            }
434        })
435        .await;
436
437        service_runtime_task.send(code)?;
438
439        while let Some(request) = execution_state_receiver.next().await {
440            self.handle_request(request, &mut ResourceController::default())
441                .await?;
442        }
443
444        service_runtime_task.join().await
445    }
446
447    async fn query_user_application_with_long_lived_service(
448        &mut self,
449        application_id: ApplicationId,
450        context: QueryContext,
451        query: Vec<u8>,
452        incoming_execution_requests: &mut futures::channel::mpsc::UnboundedReceiver<
453            ExecutionRequest,
454        >,
455        runtime_request_sender: &mut std::sync::mpsc::Sender<ServiceRuntimeRequest>,
456    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
457        let (outcome_sender, outcome_receiver) = oneshot::channel();
458        let mut outcome_receiver = outcome_receiver.fuse();
459
460        runtime_request_sender
461            .send(ServiceRuntimeRequest::Query {
462                application_id,
463                context,
464                query,
465                callback: outcome_sender,
466            })
467            .expect("Service runtime thread should only stop when `request_sender` is dropped");
468
469        loop {
470            futures::select! {
471                maybe_request = incoming_execution_requests.next() => {
472                    if let Some(request) = maybe_request {
473                        self.handle_request(request, &mut ResourceController::default()).await?;
474                    }
475                }
476                outcome = &mut outcome_receiver => {
477                    return outcome.map_err(|_| ExecutionError::MissingRuntimeResponse)?;
478                }
479            }
480        }
481    }
482
483    pub async fn list_applications(
484        &self,
485    ) -> Result<Vec<(ApplicationId, ApplicationDescription)>, ExecutionError> {
486        let mut applications = vec![];
487        for app_id in self.users.indices().await? {
488            let blob_id = app_id.description_blob_id();
489            let blob_content = self.system.read_blob_content(blob_id).await?;
490            let application_description = bcs::from_bytes(blob_content.bytes())?;
491            applications.push((app_id, application_description));
492        }
493        Ok(applications)
494    }
495}