linera_execution/
execution.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{collections::BTreeMap, vec};
5
6use allocative::Allocative;
7use futures::{FutureExt, StreamExt};
8use linera_base::{
9    crypto::CryptoHash,
10    data_types::{BlobContent, BlockHeight, StreamUpdate},
11    identifiers::{AccountOwner, BlobId, StreamId},
12    time::Instant,
13};
14use linera_views::{
15    context::Context,
16    key_value_store_view::KeyValueStoreView,
17    map_view::MapView,
18    reentrant_collection_view::HashedReentrantCollectionView,
19    views::{ClonableView, HashableView as _, ReplaceContext, View},
20    ViewError,
21};
22use linera_views_derive::HashableView;
23#[cfg(with_testing)]
24use {
25    crate::{
26        ResourceControlPolicy, ResourceTracker, TestExecutionRuntimeContext, UserContractCode,
27    },
28    linera_base::data_types::Blob,
29    linera_views::context::MemoryContext,
30    std::sync::Arc,
31};
32
33use super::{execution_state_actor::ExecutionRequest, runtime::ServiceRuntimeRequest};
34use crate::{
35    execution_state_actor::ExecutionStateActor, resources::ResourceController,
36    system::SystemExecutionStateView, ApplicationDescription, ApplicationId, BcsHashable,
37    Deserialize, ExecutionError, ExecutionRuntimeContext, JsVec, MessageContext, OperationContext,
38    ProcessStreamsContext, Query, QueryContext, QueryOutcome, Serialize, ServiceSyncRuntime,
39    Timestamp, TransactionTracker, FLAG_ZERO_HASH,
40};
41
42/// A view accessing the execution state of a chain.
43#[derive(Debug, ClonableView, HashableView, Allocative)]
44#[allocative(bound = "C")]
45pub struct ExecutionStateView<C> {
46    /// System application.
47    pub system: SystemExecutionStateView<C>,
48    /// User applications.
49    pub users: HashedReentrantCollectionView<C, ApplicationId, KeyValueStoreView<C>>,
50    /// The number of events in the streams that this chain is writing to.
51    pub stream_event_counts: MapView<C, StreamId, u32>,
52}
53
54impl<C> ExecutionStateView<C>
55where
56    C: Context + Clone + 'static,
57    C::Extra: ExecutionRuntimeContext,
58{
59    pub async fn crypto_hash_mut(&mut self) -> Result<CryptoHash, ViewError> {
60        if self
61            .system
62            .current_committee()
63            .is_some_and(|(_epoch, committee)| {
64                committee
65                    .policy()
66                    .http_request_allow_list
67                    .contains(FLAG_ZERO_HASH)
68            })
69        {
70            Ok(CryptoHash::from([0; 32]))
71        } else {
72            #[derive(Serialize, Deserialize)]
73            struct ExecutionStateViewHash([u8; 32]);
74            impl BcsHashable<'_> for ExecutionStateViewHash {}
75            self.hash_mut()
76                .await
77                .map(|hash| CryptoHash::new(&ExecutionStateViewHash(hash.into())))
78        }
79    }
80}
81
82impl<C: Context, C2: Context> ReplaceContext<C2> for ExecutionStateView<C> {
83    type Target = ExecutionStateView<C2>;
84
85    async fn with_context(
86        &mut self,
87        ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
88    ) -> Self::Target {
89        ExecutionStateView {
90            system: self.system.with_context(ctx.clone()).await,
91            users: self.users.with_context(ctx.clone()).await,
92            stream_event_counts: self.stream_event_counts.with_context(ctx.clone()).await,
93        }
94    }
95}
96
97/// How to interact with a long-lived service runtime.
98pub struct ServiceRuntimeEndpoint {
99    /// How to receive requests.
100    pub incoming_execution_requests: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
101    /// How to query the runtime.
102    pub runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
103}
104
105#[cfg(with_testing)]
106impl ExecutionStateView<MemoryContext<TestExecutionRuntimeContext>>
107where
108    MemoryContext<TestExecutionRuntimeContext>: Context + Clone + 'static,
109{
110    /// Simulates the instantiation of an application.
111    pub async fn simulate_instantiation(
112        &mut self,
113        contract: UserContractCode,
114        local_time: linera_base::data_types::Timestamp,
115        application_description: ApplicationDescription,
116        instantiation_argument: Vec<u8>,
117        contract_blob: Blob,
118        service_blob: Blob,
119    ) -> Result<(), ExecutionError> {
120        let chain_id = application_description.creator_chain_id;
121        assert_eq!(chain_id, self.context().extra().chain_id);
122        let context = OperationContext {
123            chain_id,
124            authenticated_signer: None,
125            height: application_description.block_height,
126            round: None,
127            timestamp: local_time,
128        };
129
130        let action = UserAction::Instantiate(context, instantiation_argument);
131        let next_application_index = application_description.application_index + 1;
132        let next_chain_index = 0;
133
134        let application_id = From::from(&application_description);
135        let blob = Blob::new_application_description(&application_description);
136
137        self.system.used_blobs.insert(&blob.id())?;
138        self.system.used_blobs.insert(&contract_blob.id())?;
139        self.system.used_blobs.insert(&service_blob.id())?;
140
141        self.context()
142            .extra()
143            .user_contracts()
144            .pin()
145            .insert(application_id, contract);
146
147        self.context()
148            .extra()
149            .add_blobs([
150                contract_blob,
151                service_blob,
152                Blob::new_application_description(&application_description),
153            ])
154            .await?;
155
156        let tracker = ResourceTracker::default();
157        let policy = ResourceControlPolicy::no_fees();
158        let mut resource_controller = ResourceController::new(Arc::new(policy), tracker, None);
159        let mut txn_tracker = TransactionTracker::new(
160            local_time,
161            0,
162            next_application_index,
163            next_chain_index,
164            None,
165            &[],
166        );
167        txn_tracker.add_created_blob(blob);
168        Box::pin(
169            ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller)
170                .run_user_action(application_id, action, context.refund_grant_to(), None),
171        )
172        .await?;
173
174        Ok(())
175    }
176}
177
178pub enum UserAction {
179    Instantiate(OperationContext, Vec<u8>),
180    Operation(OperationContext, Vec<u8>),
181    Message(MessageContext, Vec<u8>),
182    ProcessStreams(ProcessStreamsContext, Vec<StreamUpdate>),
183}
184
185impl UserAction {
186    pub(crate) fn signer(&self) -> Option<AccountOwner> {
187        match self {
188            UserAction::Instantiate(context, _) => context.authenticated_signer,
189            UserAction::Operation(context, _) => context.authenticated_signer,
190            UserAction::ProcessStreams(_, _) => None,
191            UserAction::Message(context, _) => context.authenticated_signer,
192        }
193    }
194
195    pub(crate) fn height(&self) -> BlockHeight {
196        match self {
197            UserAction::Instantiate(context, _) => context.height,
198            UserAction::Operation(context, _) => context.height,
199            UserAction::ProcessStreams(context, _) => context.height,
200            UserAction::Message(context, _) => context.height,
201        }
202    }
203
204    pub(crate) fn round(&self) -> Option<u32> {
205        match self {
206            UserAction::Instantiate(context, _) => context.round,
207            UserAction::Operation(context, _) => context.round,
208            UserAction::ProcessStreams(context, _) => context.round,
209            UserAction::Message(context, _) => context.round,
210        }
211    }
212
213    pub(crate) fn timestamp(&self) -> Timestamp {
214        match self {
215            UserAction::Instantiate(context, _) => context.timestamp,
216            UserAction::Operation(context, _) => context.timestamp,
217            UserAction::ProcessStreams(context, _) => context.timestamp,
218            UserAction::Message(context, _) => context.timestamp,
219        }
220    }
221}
222
223impl<C> ExecutionStateView<C>
224where
225    C: Context + Clone + 'static,
226    C::Extra: ExecutionRuntimeContext,
227{
228    pub async fn query_application(
229        &mut self,
230        context: QueryContext,
231        query: Query,
232        endpoint: Option<&mut ServiceRuntimeEndpoint>,
233    ) -> Result<QueryOutcome, ExecutionError> {
234        assert_eq!(context.chain_id, self.context().extra().chain_id());
235        match query {
236            Query::System(query) => {
237                let outcome = self.system.handle_query(context, query).await?;
238                Ok(outcome.into())
239            }
240            Query::User {
241                application_id,
242                bytes,
243            } => {
244                let outcome = match endpoint {
245                    Some(endpoint) => {
246                        self.query_user_application_with_long_lived_service(
247                            application_id,
248                            context,
249                            bytes,
250                            &mut endpoint.incoming_execution_requests,
251                            &mut endpoint.runtime_request_sender,
252                        )
253                        .await?
254                    }
255                    None => {
256                        self.query_user_application(application_id, context, bytes)
257                            .await?
258                    }
259                };
260                Ok(outcome.into())
261            }
262        }
263    }
264
265    async fn query_user_application(
266        &mut self,
267        application_id: ApplicationId,
268        context: QueryContext,
269        query: Vec<u8>,
270    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
271        self.query_user_application_with_deadline(
272            application_id,
273            context,
274            query,
275            None,
276            BTreeMap::new(),
277        )
278        .await
279    }
280
281    pub(crate) async fn query_user_application_with_deadline(
282        &mut self,
283        application_id: ApplicationId,
284        context: QueryContext,
285        query: Vec<u8>,
286        deadline: Option<Instant>,
287        created_blobs: BTreeMap<BlobId, BlobContent>,
288    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
289        let (execution_state_sender, mut execution_state_receiver) =
290            futures::channel::mpsc::unbounded();
291        let mut txn_tracker = TransactionTracker::default().with_blobs(created_blobs);
292        let mut resource_controller = ResourceController::default();
293        let thread_pool = self.context().extra().thread_pool().clone();
294        let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
295
296        let (codes, descriptions) = actor.service_and_dependencies(application_id).await?;
297
298        let service_runtime_task = thread_pool
299            .run_send(JsVec(codes), move |codes| async move {
300                let mut runtime = ServiceSyncRuntime::new_with_deadline(
301                    execution_state_sender,
302                    context,
303                    deadline,
304                );
305
306                for (code, description) in codes.0.into_iter().zip(descriptions) {
307                    runtime.preload_service(
308                        ApplicationId::from(&description),
309                        code,
310                        description,
311                    )?;
312                }
313
314                runtime.run_query(application_id, query)
315            })
316            .await;
317
318        while let Some(request) = execution_state_receiver.next().await {
319            actor.handle_request(request).await?;
320        }
321
322        service_runtime_task.await?
323    }
324
325    async fn query_user_application_with_long_lived_service(
326        &mut self,
327        application_id: ApplicationId,
328        context: QueryContext,
329        query: Vec<u8>,
330        incoming_execution_requests: &mut futures::channel::mpsc::UnboundedReceiver<
331            ExecutionRequest,
332        >,
333        runtime_request_sender: &mut std::sync::mpsc::Sender<ServiceRuntimeRequest>,
334    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
335        let (outcome_sender, outcome_receiver) = oneshot::channel();
336        let mut outcome_receiver = outcome_receiver.fuse();
337
338        runtime_request_sender
339            .send(ServiceRuntimeRequest::Query {
340                application_id,
341                context,
342                query,
343                callback: outcome_sender,
344            })
345            .expect("Service runtime thread should only stop when `request_sender` is dropped");
346
347        let mut txn_tracker = TransactionTracker::default();
348        let mut resource_controller = ResourceController::default();
349        let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
350
351        loop {
352            futures::select! {
353                maybe_request = incoming_execution_requests.next() => {
354                    if let Some(request) = maybe_request {
355                        actor.handle_request(request).await?;
356                    }
357                }
358                outcome = &mut outcome_receiver => {
359                    return outcome.map_err(|_| ExecutionError::MissingRuntimeResponse)?;
360                }
361            }
362        }
363    }
364
365    pub async fn list_applications(
366        &self,
367    ) -> Result<Vec<(ApplicationId, ApplicationDescription)>, ExecutionError> {
368        let mut applications = vec![];
369        for app_id in self.users.indices().await? {
370            let blob_id = app_id.description_blob_id();
371            let blob_content = self.system.read_blob_content(blob_id).await?;
372            let application_description = bcs::from_bytes(blob_content.bytes())?;
373            applications.push((app_id, application_description));
374        }
375        Ok(applications)
376    }
377}