linera_execution/
execution.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{collections::BTreeMap, vec};
5
6use allocative::Allocative;
7use futures::{FutureExt, StreamExt};
8use linera_base::{
9    crypto::CryptoHash,
10    data_types::{BlobContent, BlockHeight, StreamUpdate},
11    identifiers::{AccountOwner, BlobId, StreamId},
12    time::Instant,
13};
14use linera_views::{
15    context::Context,
16    key_value_store_view::KeyValueStoreView,
17    map_view::MapView,
18    reentrant_collection_view::HashedReentrantCollectionView,
19    views::{ClonableView, HashableView as _, ReplaceContext, View},
20    ViewError,
21};
22use linera_views_derive::HashableView;
23#[cfg(with_testing)]
24use {
25    crate::{
26        ResourceControlPolicy, ResourceTracker, TestExecutionRuntimeContext, UserContractCode,
27    },
28    linera_base::data_types::Blob,
29    linera_views::context::MemoryContext,
30    std::sync::Arc,
31};
32
33use super::{execution_state_actor::ExecutionRequest, runtime::ServiceRuntimeRequest};
34use crate::{
35    execution_state_actor::ExecutionStateActor, resources::ResourceController,
36    system::SystemExecutionStateView, ApplicationDescription, ApplicationId, BcsHashable,
37    Deserialize, ExecutionError, ExecutionRuntimeConfig, ExecutionRuntimeContext, MessageContext,
38    OperationContext, ProcessStreamsContext, Query, QueryContext, QueryOutcome, Serialize,
39    ServiceSyncRuntime, Timestamp, TransactionTracker, FLAG_ZERO_HASH,
40};
41
42/// A view accessing the execution state of a chain.
43#[derive(Debug, ClonableView, HashableView, Allocative)]
44#[allocative(bound = "C")]
45pub struct ExecutionStateView<C> {
46    /// System application.
47    pub system: SystemExecutionStateView<C>,
48    /// User applications.
49    pub users: HashedReentrantCollectionView<C, ApplicationId, KeyValueStoreView<C>>,
50    /// The number of events in the streams that this chain is writing to.
51    pub stream_event_counts: MapView<C, StreamId, u32>,
52}
53
54impl<C> ExecutionStateView<C>
55where
56    C: Context + Clone + Send + Sync + 'static,
57    C::Extra: ExecutionRuntimeContext,
58{
59    pub async fn crypto_hash_mut(&mut self) -> Result<CryptoHash, ViewError> {
60        if self
61            .system
62            .current_committee()
63            .is_some_and(|(_epoch, committee)| {
64                committee
65                    .policy()
66                    .http_request_allow_list
67                    .contains(FLAG_ZERO_HASH)
68            })
69        {
70            Ok(CryptoHash::from([0; 32]))
71        } else {
72            #[derive(Serialize, Deserialize)]
73            struct ExecutionStateViewHash([u8; 32]);
74            impl BcsHashable<'_> for ExecutionStateViewHash {}
75            self.hash_mut()
76                .await
77                .map(|hash| CryptoHash::new(&ExecutionStateViewHash(hash.into())))
78        }
79    }
80}
81
82impl<C: Context, C2: Context> ReplaceContext<C2> for ExecutionStateView<C> {
83    type Target = ExecutionStateView<C2>;
84
85    async fn with_context(
86        &mut self,
87        ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
88    ) -> Self::Target {
89        ExecutionStateView {
90            system: self.system.with_context(ctx.clone()).await,
91            users: self.users.with_context(ctx.clone()).await,
92            stream_event_counts: self.stream_event_counts.with_context(ctx.clone()).await,
93        }
94    }
95}
96
97/// How to interact with a long-lived service runtime.
98pub struct ServiceRuntimeEndpoint {
99    /// How to receive requests.
100    pub incoming_execution_requests: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
101    /// How to query the runtime.
102    pub runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
103}
104
105#[cfg(with_testing)]
106impl ExecutionStateView<MemoryContext<TestExecutionRuntimeContext>>
107where
108    MemoryContext<TestExecutionRuntimeContext>: Context + Clone + Send + Sync + 'static,
109{
110    /// Simulates the instantiation of an application.
111    pub async fn simulate_instantiation(
112        &mut self,
113        contract: UserContractCode,
114        local_time: linera_base::data_types::Timestamp,
115        application_description: ApplicationDescription,
116        instantiation_argument: Vec<u8>,
117        contract_blob: Blob,
118        service_blob: Blob,
119    ) -> Result<(), ExecutionError> {
120        let chain_id = application_description.creator_chain_id;
121        assert_eq!(chain_id, self.context().extra().chain_id);
122        let context = OperationContext {
123            chain_id,
124            authenticated_signer: None,
125            height: application_description.block_height,
126            round: None,
127            timestamp: local_time,
128        };
129
130        let action = UserAction::Instantiate(context, instantiation_argument);
131        let next_application_index = application_description.application_index + 1;
132        let next_chain_index = 0;
133
134        let application_id = From::from(&application_description);
135        let blob = Blob::new_application_description(&application_description);
136
137        self.system.used_blobs.insert(&blob.id())?;
138        self.system.used_blobs.insert(&contract_blob.id())?;
139        self.system.used_blobs.insert(&service_blob.id())?;
140
141        self.context()
142            .extra()
143            .user_contracts()
144            .pin()
145            .insert(application_id, contract);
146
147        self.context()
148            .extra()
149            .add_blobs([
150                contract_blob,
151                service_blob,
152                Blob::new_application_description(&application_description),
153            ])
154            .await?;
155
156        let tracker = ResourceTracker::default();
157        let policy = ResourceControlPolicy::no_fees();
158        let mut resource_controller = ResourceController::new(Arc::new(policy), tracker, None);
159        let mut txn_tracker = TransactionTracker::new(
160            local_time,
161            0,
162            next_application_index,
163            next_chain_index,
164            None,
165            &[],
166        );
167        txn_tracker.add_created_blob(blob);
168        Box::pin(
169            ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller)
170                .run_user_action(application_id, action, context.refund_grant_to(), None),
171        )
172        .await?;
173
174        Ok(())
175    }
176}
177
178pub enum UserAction {
179    Instantiate(OperationContext, Vec<u8>),
180    Operation(OperationContext, Vec<u8>),
181    Message(MessageContext, Vec<u8>),
182    ProcessStreams(ProcessStreamsContext, Vec<StreamUpdate>),
183}
184
185impl UserAction {
186    pub(crate) fn signer(&self) -> Option<AccountOwner> {
187        match self {
188            UserAction::Instantiate(context, _) => context.authenticated_signer,
189            UserAction::Operation(context, _) => context.authenticated_signer,
190            UserAction::ProcessStreams(_, _) => None,
191            UserAction::Message(context, _) => context.authenticated_signer,
192        }
193    }
194
195    pub(crate) fn height(&self) -> BlockHeight {
196        match self {
197            UserAction::Instantiate(context, _) => context.height,
198            UserAction::Operation(context, _) => context.height,
199            UserAction::ProcessStreams(context, _) => context.height,
200            UserAction::Message(context, _) => context.height,
201        }
202    }
203
204    pub(crate) fn round(&self) -> Option<u32> {
205        match self {
206            UserAction::Instantiate(context, _) => context.round,
207            UserAction::Operation(context, _) => context.round,
208            UserAction::ProcessStreams(context, _) => context.round,
209            UserAction::Message(context, _) => context.round,
210        }
211    }
212
213    pub(crate) fn timestamp(&self) -> Timestamp {
214        match self {
215            UserAction::Instantiate(context, _) => context.timestamp,
216            UserAction::Operation(context, _) => context.timestamp,
217            UserAction::ProcessStreams(context, _) => context.timestamp,
218            UserAction::Message(context, _) => context.timestamp,
219        }
220    }
221}
222
223impl<C> ExecutionStateView<C>
224where
225    C: Context + Clone + Send + Sync + 'static,
226    C::Extra: ExecutionRuntimeContext,
227{
228    pub async fn query_application(
229        &mut self,
230        context: QueryContext,
231        query: Query,
232        endpoint: Option<&mut ServiceRuntimeEndpoint>,
233    ) -> Result<QueryOutcome, ExecutionError> {
234        assert_eq!(context.chain_id, self.context().extra().chain_id());
235        match query {
236            Query::System(query) => {
237                let outcome = self.system.handle_query(context, query).await?;
238                Ok(outcome.into())
239            }
240            Query::User {
241                application_id,
242                bytes,
243            } => {
244                let ExecutionRuntimeConfig {} = self.context().extra().execution_runtime_config();
245                let outcome = match endpoint {
246                    Some(endpoint) => {
247                        self.query_user_application_with_long_lived_service(
248                            application_id,
249                            context,
250                            bytes,
251                            &mut endpoint.incoming_execution_requests,
252                            &mut endpoint.runtime_request_sender,
253                        )
254                        .await?
255                    }
256                    None => {
257                        self.query_user_application(application_id, context, bytes)
258                            .await?
259                    }
260                };
261                Ok(outcome.into())
262            }
263        }
264    }
265
266    async fn query_user_application(
267        &mut self,
268        application_id: ApplicationId,
269        context: QueryContext,
270        query: Vec<u8>,
271    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
272        self.query_user_application_with_deadline(
273            application_id,
274            context,
275            query,
276            None,
277            BTreeMap::new(),
278        )
279        .await
280    }
281
282    pub(crate) async fn query_user_application_with_deadline(
283        &mut self,
284        application_id: ApplicationId,
285        context: QueryContext,
286        query: Vec<u8>,
287        deadline: Option<Instant>,
288        created_blobs: BTreeMap<BlobId, BlobContent>,
289    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
290        let (execution_state_sender, mut execution_state_receiver) =
291            futures::channel::mpsc::unbounded();
292        let mut txn_tracker = TransactionTracker::default().with_blobs(created_blobs);
293        let mut resource_controller = ResourceController::default();
294        let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
295        let (code, description) = actor.load_service(application_id).await?;
296
297        let service_runtime_task = linera_base::task::Blocking::spawn(move |mut codes| {
298            let mut runtime =
299                ServiceSyncRuntime::new_with_deadline(execution_state_sender, context, deadline);
300
301            async move {
302                let code = codes.next().await.expect("we send this immediately below");
303                runtime.preload_service(application_id, code, description)?;
304                runtime.run_query(application_id, query)
305            }
306        })
307        .await;
308
309        service_runtime_task.send(code)?;
310
311        while let Some(request) = execution_state_receiver.next().await {
312            actor.handle_request(request).await?;
313        }
314
315        service_runtime_task.join().await
316    }
317
318    async fn query_user_application_with_long_lived_service(
319        &mut self,
320        application_id: ApplicationId,
321        context: QueryContext,
322        query: Vec<u8>,
323        incoming_execution_requests: &mut futures::channel::mpsc::UnboundedReceiver<
324            ExecutionRequest,
325        >,
326        runtime_request_sender: &mut std::sync::mpsc::Sender<ServiceRuntimeRequest>,
327    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
328        let (outcome_sender, outcome_receiver) = oneshot::channel();
329        let mut outcome_receiver = outcome_receiver.fuse();
330
331        runtime_request_sender
332            .send(ServiceRuntimeRequest::Query {
333                application_id,
334                context,
335                query,
336                callback: outcome_sender,
337            })
338            .expect("Service runtime thread should only stop when `request_sender` is dropped");
339
340        let mut txn_tracker = TransactionTracker::default();
341        let mut resource_controller = ResourceController::default();
342        let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
343
344        loop {
345            futures::select! {
346                maybe_request = incoming_execution_requests.next() => {
347                    if let Some(request) = maybe_request {
348                        actor.handle_request(request).await?;
349                    }
350                }
351                outcome = &mut outcome_receiver => {
352                    return outcome.map_err(|_| ExecutionError::MissingRuntimeResponse)?;
353                }
354            }
355        }
356    }
357
358    pub async fn list_applications(
359        &self,
360    ) -> Result<Vec<(ApplicationId, ApplicationDescription)>, ExecutionError> {
361        let mut applications = vec![];
362        for app_id in self.users.indices().await? {
363            let blob_id = app_id.description_blob_id();
364            let blob_content = self.system.read_blob_content(blob_id).await?;
365            let application_description = bcs::from_bytes(blob_content.bytes())?;
366            applications.push((app_id, application_description));
367        }
368        Ok(applications)
369    }
370}