1use std::{collections::BTreeMap, vec};
5
6use allocative::Allocative;
7use futures::{FutureExt, StreamExt};
8use linera_base::{
9 crypto::CryptoHash,
10 data_types::{BlobContent, BlockHeight, StreamUpdate},
11 identifiers::{AccountOwner, BlobId, StreamId},
12 time::Instant,
13};
14use linera_views::{
15 context::Context,
16 key_value_store_view::KeyValueStoreView,
17 map_view::MapView,
18 reentrant_collection_view::HashedReentrantCollectionView,
19 views::{ClonableView, HashableView as _, ReplaceContext, View},
20 ViewError,
21};
22use linera_views_derive::HashableView;
23#[cfg(with_testing)]
24use {
25 crate::{
26 ResourceControlPolicy, ResourceTracker, TestExecutionRuntimeContext, UserContractCode,
27 },
28 linera_base::data_types::Blob,
29 linera_views::context::MemoryContext,
30 std::sync::Arc,
31};
32
33use super::{execution_state_actor::ExecutionRequest, runtime::ServiceRuntimeRequest};
34use crate::{
35 execution_state_actor::ExecutionStateActor, resources::ResourceController,
36 system::SystemExecutionStateView, ApplicationDescription, ApplicationId, BcsHashable,
37 Deserialize, ExecutionError, ExecutionRuntimeConfig, ExecutionRuntimeContext, MessageContext,
38 OperationContext, ProcessStreamsContext, Query, QueryContext, QueryOutcome, Serialize,
39 ServiceSyncRuntime, Timestamp, TransactionTracker, FLAG_ZERO_HASH,
40};
41
42#[derive(Debug, ClonableView, HashableView, Allocative)]
44#[allocative(bound = "C")]
45pub struct ExecutionStateView<C> {
46 pub system: SystemExecutionStateView<C>,
48 pub users: HashedReentrantCollectionView<C, ApplicationId, KeyValueStoreView<C>>,
50 pub stream_event_counts: MapView<C, StreamId, u32>,
52}
53
54impl<C> ExecutionStateView<C>
55where
56 C: Context + Clone + Send + Sync + 'static,
57 C::Extra: ExecutionRuntimeContext,
58{
59 pub async fn crypto_hash_mut(&mut self) -> Result<CryptoHash, ViewError> {
60 if self
61 .system
62 .current_committee()
63 .is_some_and(|(_epoch, committee)| {
64 committee
65 .policy()
66 .http_request_allow_list
67 .contains(FLAG_ZERO_HASH)
68 })
69 {
70 Ok(CryptoHash::from([0; 32]))
71 } else {
72 #[derive(Serialize, Deserialize)]
73 struct ExecutionStateViewHash([u8; 32]);
74 impl BcsHashable<'_> for ExecutionStateViewHash {}
75 self.hash_mut()
76 .await
77 .map(|hash| CryptoHash::new(&ExecutionStateViewHash(hash.into())))
78 }
79 }
80}
81
82impl<C: Context, C2: Context> ReplaceContext<C2> for ExecutionStateView<C> {
83 type Target = ExecutionStateView<C2>;
84
85 async fn with_context(
86 &mut self,
87 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
88 ) -> Self::Target {
89 ExecutionStateView {
90 system: self.system.with_context(ctx.clone()).await,
91 users: self.users.with_context(ctx.clone()).await,
92 stream_event_counts: self.stream_event_counts.with_context(ctx.clone()).await,
93 }
94 }
95}
96
97pub struct ServiceRuntimeEndpoint {
99 pub incoming_execution_requests: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
101 pub runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
103}
104
105#[cfg(with_testing)]
106impl ExecutionStateView<MemoryContext<TestExecutionRuntimeContext>>
107where
108 MemoryContext<TestExecutionRuntimeContext>: Context + Clone + Send + Sync + 'static,
109{
110 pub async fn simulate_instantiation(
112 &mut self,
113 contract: UserContractCode,
114 local_time: linera_base::data_types::Timestamp,
115 application_description: ApplicationDescription,
116 instantiation_argument: Vec<u8>,
117 contract_blob: Blob,
118 service_blob: Blob,
119 ) -> Result<(), ExecutionError> {
120 let chain_id = application_description.creator_chain_id;
121 assert_eq!(chain_id, self.context().extra().chain_id);
122 let context = OperationContext {
123 chain_id,
124 authenticated_signer: None,
125 height: application_description.block_height,
126 round: None,
127 timestamp: local_time,
128 };
129
130 let action = UserAction::Instantiate(context, instantiation_argument);
131 let next_application_index = application_description.application_index + 1;
132 let next_chain_index = 0;
133
134 let application_id = From::from(&application_description);
135 let blob = Blob::new_application_description(&application_description);
136
137 self.system.used_blobs.insert(&blob.id())?;
138 self.system.used_blobs.insert(&contract_blob.id())?;
139 self.system.used_blobs.insert(&service_blob.id())?;
140
141 self.context()
142 .extra()
143 .user_contracts()
144 .pin()
145 .insert(application_id, contract);
146
147 self.context()
148 .extra()
149 .add_blobs([
150 contract_blob,
151 service_blob,
152 Blob::new_application_description(&application_description),
153 ])
154 .await?;
155
156 let tracker = ResourceTracker::default();
157 let policy = ResourceControlPolicy::no_fees();
158 let mut resource_controller = ResourceController::new(Arc::new(policy), tracker, None);
159 let mut txn_tracker = TransactionTracker::new(
160 local_time,
161 0,
162 next_application_index,
163 next_chain_index,
164 None,
165 &[],
166 );
167 txn_tracker.add_created_blob(blob);
168 Box::pin(
169 ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller)
170 .run_user_action(application_id, action, context.refund_grant_to(), None),
171 )
172 .await?;
173
174 Ok(())
175 }
176}
177
178pub enum UserAction {
179 Instantiate(OperationContext, Vec<u8>),
180 Operation(OperationContext, Vec<u8>),
181 Message(MessageContext, Vec<u8>),
182 ProcessStreams(ProcessStreamsContext, Vec<StreamUpdate>),
183}
184
185impl UserAction {
186 pub(crate) fn signer(&self) -> Option<AccountOwner> {
187 match self {
188 UserAction::Instantiate(context, _) => context.authenticated_signer,
189 UserAction::Operation(context, _) => context.authenticated_signer,
190 UserAction::ProcessStreams(_, _) => None,
191 UserAction::Message(context, _) => context.authenticated_signer,
192 }
193 }
194
195 pub(crate) fn height(&self) -> BlockHeight {
196 match self {
197 UserAction::Instantiate(context, _) => context.height,
198 UserAction::Operation(context, _) => context.height,
199 UserAction::ProcessStreams(context, _) => context.height,
200 UserAction::Message(context, _) => context.height,
201 }
202 }
203
204 pub(crate) fn round(&self) -> Option<u32> {
205 match self {
206 UserAction::Instantiate(context, _) => context.round,
207 UserAction::Operation(context, _) => context.round,
208 UserAction::ProcessStreams(context, _) => context.round,
209 UserAction::Message(context, _) => context.round,
210 }
211 }
212
213 pub(crate) fn timestamp(&self) -> Timestamp {
214 match self {
215 UserAction::Instantiate(context, _) => context.timestamp,
216 UserAction::Operation(context, _) => context.timestamp,
217 UserAction::ProcessStreams(context, _) => context.timestamp,
218 UserAction::Message(context, _) => context.timestamp,
219 }
220 }
221}
222
223impl<C> ExecutionStateView<C>
224where
225 C: Context + Clone + Send + Sync + 'static,
226 C::Extra: ExecutionRuntimeContext,
227{
228 pub async fn query_application(
229 &mut self,
230 context: QueryContext,
231 query: Query,
232 endpoint: Option<&mut ServiceRuntimeEndpoint>,
233 ) -> Result<QueryOutcome, ExecutionError> {
234 assert_eq!(context.chain_id, self.context().extra().chain_id());
235 match query {
236 Query::System(query) => {
237 let outcome = self.system.handle_query(context, query).await?;
238 Ok(outcome.into())
239 }
240 Query::User {
241 application_id,
242 bytes,
243 } => {
244 let ExecutionRuntimeConfig {} = self.context().extra().execution_runtime_config();
245 let outcome = match endpoint {
246 Some(endpoint) => {
247 self.query_user_application_with_long_lived_service(
248 application_id,
249 context,
250 bytes,
251 &mut endpoint.incoming_execution_requests,
252 &mut endpoint.runtime_request_sender,
253 )
254 .await?
255 }
256 None => {
257 self.query_user_application(application_id, context, bytes)
258 .await?
259 }
260 };
261 Ok(outcome.into())
262 }
263 }
264 }
265
266 async fn query_user_application(
267 &mut self,
268 application_id: ApplicationId,
269 context: QueryContext,
270 query: Vec<u8>,
271 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
272 self.query_user_application_with_deadline(
273 application_id,
274 context,
275 query,
276 None,
277 BTreeMap::new(),
278 )
279 .await
280 }
281
282 pub(crate) async fn query_user_application_with_deadline(
283 &mut self,
284 application_id: ApplicationId,
285 context: QueryContext,
286 query: Vec<u8>,
287 deadline: Option<Instant>,
288 created_blobs: BTreeMap<BlobId, BlobContent>,
289 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
290 let (execution_state_sender, mut execution_state_receiver) =
291 futures::channel::mpsc::unbounded();
292 let mut txn_tracker = TransactionTracker::default().with_blobs(created_blobs);
293 let mut resource_controller = ResourceController::default();
294 let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
295 let (code, description) = actor.load_service(application_id).await?;
296
297 let service_runtime_task = linera_base::task::Blocking::spawn(move |mut codes| {
298 let mut runtime =
299 ServiceSyncRuntime::new_with_deadline(execution_state_sender, context, deadline);
300
301 async move {
302 let code = codes.next().await.expect("we send this immediately below");
303 runtime.preload_service(application_id, code, description)?;
304 runtime.run_query(application_id, query)
305 }
306 })
307 .await;
308
309 service_runtime_task.send(code)?;
310
311 while let Some(request) = execution_state_receiver.next().await {
312 actor.handle_request(request).await?;
313 }
314
315 service_runtime_task.join().await
316 }
317
318 async fn query_user_application_with_long_lived_service(
319 &mut self,
320 application_id: ApplicationId,
321 context: QueryContext,
322 query: Vec<u8>,
323 incoming_execution_requests: &mut futures::channel::mpsc::UnboundedReceiver<
324 ExecutionRequest,
325 >,
326 runtime_request_sender: &mut std::sync::mpsc::Sender<ServiceRuntimeRequest>,
327 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
328 let (outcome_sender, outcome_receiver) = oneshot::channel();
329 let mut outcome_receiver = outcome_receiver.fuse();
330
331 runtime_request_sender
332 .send(ServiceRuntimeRequest::Query {
333 application_id,
334 context,
335 query,
336 callback: outcome_sender,
337 })
338 .expect("Service runtime thread should only stop when `request_sender` is dropped");
339
340 let mut txn_tracker = TransactionTracker::default();
341 let mut resource_controller = ResourceController::default();
342 let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
343
344 loop {
345 futures::select! {
346 maybe_request = incoming_execution_requests.next() => {
347 if let Some(request) = maybe_request {
348 actor.handle_request(request).await?;
349 }
350 }
351 outcome = &mut outcome_receiver => {
352 return outcome.map_err(|_| ExecutionError::MissingRuntimeResponse)?;
353 }
354 }
355 }
356 }
357
358 pub async fn list_applications(
359 &self,
360 ) -> Result<Vec<(ApplicationId, ApplicationDescription)>, ExecutionError> {
361 let mut applications = vec![];
362 for app_id in self.users.indices().await? {
363 let blob_id = app_id.description_blob_id();
364 let blob_content = self.system.read_blob_content(blob_id).await?;
365 let application_description = bcs::from_bytes(blob_content.bytes())?;
366 applications.push((app_id, application_description));
367 }
368 Ok(applications)
369 }
370}