1use std::{collections::BTreeMap, vec};
5
6use allocative::Allocative;
7use futures::{FutureExt, StreamExt};
8use linera_base::{
9 crypto::CryptoHash,
10 data_types::{BlobContent, BlockHeight, StreamUpdate},
11 identifiers::{AccountOwner, BlobId, StreamId},
12 time::Instant,
13};
14use linera_views::{
15 context::Context,
16 key_value_store_view::KeyValueStoreView,
17 map_view::MapView,
18 reentrant_collection_view::HashedReentrantCollectionView,
19 views::{ClonableView, HashableView as _, ReplaceContext, View},
20 ViewError,
21};
22use linera_views_derive::HashableView;
23#[cfg(with_testing)]
24use {
25 crate::{
26 ResourceControlPolicy, ResourceTracker, TestExecutionRuntimeContext, UserContractCode,
27 },
28 linera_base::data_types::Blob,
29 linera_views::context::MemoryContext,
30 std::sync::Arc,
31};
32
33use super::{execution_state_actor::ExecutionRequest, runtime::ServiceRuntimeRequest};
34use crate::{
35 execution_state_actor::ExecutionStateActor, resources::ResourceController,
36 system::SystemExecutionStateView, ApplicationDescription, ApplicationId, BcsHashable,
37 Deserialize, ExecutionError, ExecutionRuntimeContext, JsVec, MessageContext, OperationContext,
38 ProcessStreamsContext, Query, QueryContext, QueryOutcome, Serialize, ServiceSyncRuntime,
39 Timestamp, TransactionTracker, FLAG_ZERO_HASH,
40};
41
42#[derive(Debug, ClonableView, HashableView, Allocative)]
44#[allocative(bound = "C")]
45pub struct ExecutionStateView<C> {
46 pub system: SystemExecutionStateView<C>,
48 pub users: HashedReentrantCollectionView<C, ApplicationId, KeyValueStoreView<C>>,
50 pub stream_event_counts: MapView<C, StreamId, u32>,
52}
53
54impl<C> ExecutionStateView<C>
55where
56 C: Context + Clone + 'static,
57 C::Extra: ExecutionRuntimeContext,
58{
59 pub async fn crypto_hash_mut(&mut self) -> Result<CryptoHash, ViewError> {
60 if self
61 .system
62 .current_committee()
63 .await?
64 .is_some_and(|(_epoch, committee)| {
65 committee
66 .policy()
67 .http_request_allow_list
68 .contains(FLAG_ZERO_HASH)
69 })
70 {
71 Ok(CryptoHash::from([0; 32]))
72 } else {
73 #[derive(Serialize, Deserialize)]
74 struct ExecutionStateViewHash([u8; 32]);
75 impl BcsHashable<'_> for ExecutionStateViewHash {}
76 self.hash_mut()
77 .await
78 .map(|hash| CryptoHash::new(&ExecutionStateViewHash(hash.into())))
79 }
80 }
81}
82
83impl<C: Context, C2: Context> ReplaceContext<C2> for ExecutionStateView<C> {
84 type Target = ExecutionStateView<C2>;
85
86 async fn with_context(
87 &mut self,
88 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
89 ) -> Self::Target {
90 ExecutionStateView {
91 system: self.system.with_context(ctx.clone()).await,
92 users: self.users.with_context(ctx.clone()).await,
93 stream_event_counts: self.stream_event_counts.with_context(ctx.clone()).await,
94 }
95 }
96}
97
98pub struct ServiceRuntimeEndpoint {
100 pub incoming_execution_requests: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
102 pub runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
104}
105
106#[cfg(with_testing)]
107impl ExecutionStateView<MemoryContext<TestExecutionRuntimeContext>>
108where
109 MemoryContext<TestExecutionRuntimeContext>: Context + Clone + 'static,
110{
111 pub async fn simulate_instantiation(
113 &mut self,
114 contract: UserContractCode,
115 local_time: linera_base::data_types::Timestamp,
116 application_description: ApplicationDescription,
117 instantiation_argument: Vec<u8>,
118 contract_blob: Blob,
119 service_blob: Blob,
120 ) -> Result<(), ExecutionError> {
121 let chain_id = application_description.creator_chain_id;
122 assert_eq!(chain_id, self.context().extra().chain_id);
123 let context = OperationContext {
124 chain_id,
125 authenticated_signer: None,
126 height: application_description.block_height,
127 round: None,
128 timestamp: local_time,
129 };
130
131 let action = UserAction::Instantiate(context, instantiation_argument);
132 let next_application_index = application_description.application_index + 1;
133 let next_chain_index = 0;
134
135 let application_id = From::from(&application_description);
136 let blob = Blob::new_application_description(&application_description);
137
138 self.system.used_blobs.insert(&blob.id())?;
139 self.system.used_blobs.insert(&contract_blob.id())?;
140 self.system.used_blobs.insert(&service_blob.id())?;
141
142 self.context()
143 .extra()
144 .user_contracts()
145 .pin()
146 .insert(application_id, contract);
147
148 self.context()
149 .extra()
150 .add_blobs([
151 contract_blob,
152 service_blob,
153 Blob::new_application_description(&application_description),
154 ])
155 .await?;
156
157 let tracker = ResourceTracker::default();
158 let policy = ResourceControlPolicy::no_fees();
159 let mut resource_controller = ResourceController::new(Arc::new(policy), tracker, None);
160 let mut txn_tracker = TransactionTracker::new(
161 local_time,
162 0,
163 next_application_index,
164 next_chain_index,
165 None,
166 &[],
167 );
168 txn_tracker.add_created_blob(blob);
169 Box::pin(
170 ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller)
171 .run_user_action(application_id, action, context.refund_grant_to(), None),
172 )
173 .await?;
174
175 Ok(())
176 }
177}
178
179pub enum UserAction {
180 Instantiate(OperationContext, Vec<u8>),
181 Operation(OperationContext, Vec<u8>),
182 Message(MessageContext, Vec<u8>),
183 ProcessStreams(ProcessStreamsContext, Vec<StreamUpdate>),
184}
185
186impl UserAction {
187 pub(crate) fn signer(&self) -> Option<AccountOwner> {
188 match self {
189 UserAction::Instantiate(context, _) => context.authenticated_signer,
190 UserAction::Operation(context, _) => context.authenticated_signer,
191 UserAction::ProcessStreams(_, _) => None,
192 UserAction::Message(context, _) => context.authenticated_signer,
193 }
194 }
195
196 pub(crate) fn height(&self) -> BlockHeight {
197 match self {
198 UserAction::Instantiate(context, _) => context.height,
199 UserAction::Operation(context, _) => context.height,
200 UserAction::ProcessStreams(context, _) => context.height,
201 UserAction::Message(context, _) => context.height,
202 }
203 }
204
205 pub(crate) fn round(&self) -> Option<u32> {
206 match self {
207 UserAction::Instantiate(context, _) => context.round,
208 UserAction::Operation(context, _) => context.round,
209 UserAction::ProcessStreams(context, _) => context.round,
210 UserAction::Message(context, _) => context.round,
211 }
212 }
213
214 pub(crate) fn timestamp(&self) -> Timestamp {
215 match self {
216 UserAction::Instantiate(context, _) => context.timestamp,
217 UserAction::Operation(context, _) => context.timestamp,
218 UserAction::ProcessStreams(context, _) => context.timestamp,
219 UserAction::Message(context, _) => context.timestamp,
220 }
221 }
222}
223
224impl<C> ExecutionStateView<C>
225where
226 C: Context + Clone + 'static,
227 C::Extra: ExecutionRuntimeContext,
228{
229 pub async fn query_application(
230 &mut self,
231 context: QueryContext,
232 query: Query,
233 endpoint: Option<&mut ServiceRuntimeEndpoint>,
234 ) -> Result<QueryOutcome, ExecutionError> {
235 assert_eq!(context.chain_id, self.context().extra().chain_id());
236 match query {
237 Query::System(query) => {
238 let outcome = self.system.handle_query(context, query)?;
239 Ok(outcome.into())
240 }
241 Query::User {
242 application_id,
243 bytes,
244 } => {
245 let outcome = match endpoint {
246 Some(endpoint) => {
247 self.query_user_application_with_long_lived_service(
248 application_id,
249 context,
250 bytes,
251 &mut endpoint.incoming_execution_requests,
252 &mut endpoint.runtime_request_sender,
253 )
254 .await?
255 }
256 None => {
257 self.query_user_application(application_id, context, bytes)
258 .await?
259 }
260 };
261 Ok(outcome.into())
262 }
263 }
264 }
265
266 async fn query_user_application(
267 &mut self,
268 application_id: ApplicationId,
269 context: QueryContext,
270 query: Vec<u8>,
271 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
272 self.query_user_application_with_deadline(
273 application_id,
274 context,
275 query,
276 None,
277 BTreeMap::new(),
278 )
279 .await
280 }
281
282 pub(crate) async fn query_user_application_with_deadline(
283 &mut self,
284 application_id: ApplicationId,
285 context: QueryContext,
286 query: Vec<u8>,
287 deadline: Option<Instant>,
288 created_blobs: BTreeMap<BlobId, BlobContent>,
289 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
290 let (execution_state_sender, mut execution_state_receiver) =
291 futures::channel::mpsc::unbounded();
292 let mut txn_tracker = TransactionTracker::default().with_blobs(created_blobs);
293 let mut resource_controller = ResourceController::default();
294 let thread_pool = self.context().extra().thread_pool().clone();
295 let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
296
297 let (codes, descriptions) = actor.service_and_dependencies(application_id).await?;
298
299 let service_runtime_task = thread_pool
300 .run_send(JsVec(codes), move |codes| async move {
301 let mut runtime = ServiceSyncRuntime::new_with_deadline(
302 execution_state_sender,
303 context,
304 deadline,
305 );
306
307 for (code, description) in codes.0.into_iter().zip(descriptions) {
308 runtime.preload_service(
309 ApplicationId::from(&description),
310 code,
311 description,
312 )?;
313 }
314
315 runtime.run_query(application_id, query)
316 })
317 .await;
318
319 while let Some(request) = execution_state_receiver.next().await {
320 actor.handle_request(request).await?;
321 }
322
323 service_runtime_task.await?
324 }
325
326 async fn query_user_application_with_long_lived_service(
327 &mut self,
328 application_id: ApplicationId,
329 context: QueryContext,
330 query: Vec<u8>,
331 incoming_execution_requests: &mut futures::channel::mpsc::UnboundedReceiver<
332 ExecutionRequest,
333 >,
334 runtime_request_sender: &mut std::sync::mpsc::Sender<ServiceRuntimeRequest>,
335 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
336 let (outcome_sender, outcome_receiver) = oneshot::channel();
337 let mut outcome_receiver = outcome_receiver.fuse();
338
339 runtime_request_sender
340 .send(ServiceRuntimeRequest::Query {
341 application_id,
342 context,
343 query,
344 callback: outcome_sender,
345 })
346 .expect("Service runtime thread should only stop when `request_sender` is dropped");
347
348 let mut txn_tracker = TransactionTracker::default();
349 let mut resource_controller = ResourceController::default();
350 let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
351
352 loop {
353 futures::select! {
354 maybe_request = incoming_execution_requests.next() => {
355 if let Some(request) = maybe_request {
356 actor.handle_request(request).await?;
357 }
358 }
359 outcome = &mut outcome_receiver => {
360 return outcome.map_err(|_| ExecutionError::MissingRuntimeResponse)?;
361 }
362 }
363 }
364 }
365
366 pub async fn list_applications(
367 &self,
368 ) -> Result<Vec<(ApplicationId, ApplicationDescription)>, ExecutionError> {
369 let mut applications = vec![];
370 for app_id in self.users.indices().await? {
371 let blob_id = app_id.description_blob_id();
372 let blob_content = self.system.read_blob_content(blob_id).await?;
373 let application_description = bcs::from_bytes(blob_content.bytes())?;
374 applications.push((app_id, application_description));
375 }
376 Ok(applications)
377 }
378}