1use std::{collections::BTreeMap, vec};
5
6use futures::{FutureExt, StreamExt};
7use linera_base::{
8 data_types::{BlobContent, BlockHeight, StreamUpdate},
9 identifiers::{AccountOwner, BlobId, StreamId},
10 time::Instant,
11};
12use linera_views::{
13 context::Context,
14 key_value_store_view::KeyValueStoreView,
15 map_view::MapView,
16 reentrant_collection_view::HashedReentrantCollectionView,
17 views::{ClonableView, ReplaceContext, View},
18};
19use linera_views_derive::CryptoHashView;
20#[cfg(with_testing)]
21use {
22 crate::{
23 ResourceControlPolicy, ResourceTracker, TestExecutionRuntimeContext, UserContractCode,
24 },
25 linera_base::data_types::Blob,
26 linera_views::context::MemoryContext,
27 std::sync::Arc,
28};
29
30use super::{execution_state_actor::ExecutionRequest, runtime::ServiceRuntimeRequest};
31use crate::{
32 execution_state_actor::ExecutionStateActor, resources::ResourceController,
33 system::SystemExecutionStateView, ApplicationDescription, ApplicationId, ExecutionError,
34 ExecutionRuntimeConfig, ExecutionRuntimeContext, MessageContext, OperationContext,
35 ProcessStreamsContext, Query, QueryContext, QueryOutcome, ServiceSyncRuntime, Timestamp,
36 TransactionTracker,
37};
38
39#[derive(Debug, ClonableView, CryptoHashView)]
41pub struct ExecutionStateView<C> {
42 pub system: SystemExecutionStateView<C>,
44 pub users: HashedReentrantCollectionView<C, ApplicationId, KeyValueStoreView<C>>,
46 pub stream_event_counts: MapView<C, StreamId, u32>,
48}
49
50impl<C: Context, C2: Context> ReplaceContext<C2> for ExecutionStateView<C> {
51 type Target = ExecutionStateView<C2>;
52
53 async fn with_context(
54 &mut self,
55 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
56 ) -> Self::Target {
57 ExecutionStateView {
58 system: self.system.with_context(ctx.clone()).await,
59 users: self.users.with_context(ctx.clone()).await,
60 stream_event_counts: self.stream_event_counts.with_context(ctx.clone()).await,
61 }
62 }
63}
64
65pub struct ServiceRuntimeEndpoint {
67 pub incoming_execution_requests: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
69 pub runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
71}
72
73#[cfg(with_testing)]
74impl ExecutionStateView<MemoryContext<TestExecutionRuntimeContext>>
75where
76 MemoryContext<TestExecutionRuntimeContext>: Context + Clone + Send + Sync + 'static,
77{
78 pub async fn simulate_instantiation(
80 &mut self,
81 contract: UserContractCode,
82 local_time: linera_base::data_types::Timestamp,
83 application_description: ApplicationDescription,
84 instantiation_argument: Vec<u8>,
85 contract_blob: Blob,
86 service_blob: Blob,
87 ) -> Result<(), ExecutionError> {
88 let chain_id = application_description.creator_chain_id;
89 assert_eq!(chain_id, self.context().extra().chain_id);
90 let context = OperationContext {
91 chain_id,
92 authenticated_signer: None,
93 height: application_description.block_height,
94 round: None,
95 timestamp: local_time,
96 };
97
98 let action = UserAction::Instantiate(context, instantiation_argument);
99 let next_application_index = application_description.application_index + 1;
100 let next_chain_index = 0;
101
102 let application_id = From::from(&application_description);
103 let blob = Blob::new_application_description(&application_description);
104
105 self.system.used_blobs.insert(&blob.id())?;
106 self.system.used_blobs.insert(&contract_blob.id())?;
107 self.system.used_blobs.insert(&service_blob.id())?;
108
109 self.context()
110 .extra()
111 .user_contracts()
112 .insert(application_id, contract);
113
114 self.context()
115 .extra()
116 .add_blobs([
117 contract_blob,
118 service_blob,
119 Blob::new_application_description(&application_description),
120 ])
121 .await?;
122
123 let tracker = ResourceTracker::default();
124 let policy = ResourceControlPolicy::no_fees();
125 let mut resource_controller = ResourceController::new(Arc::new(policy), tracker, None);
126 let mut txn_tracker = TransactionTracker::new(
127 local_time,
128 0,
129 next_application_index,
130 next_chain_index,
131 None,
132 &[],
133 );
134 txn_tracker.add_created_blob(blob);
135 ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller)
136 .run_user_action(application_id, action, context.refund_grant_to(), None)
137 .await?;
138
139 Ok(())
140 }
141}
142
143pub enum UserAction {
144 Instantiate(OperationContext, Vec<u8>),
145 Operation(OperationContext, Vec<u8>),
146 Message(MessageContext, Vec<u8>),
147 ProcessStreams(ProcessStreamsContext, Vec<StreamUpdate>),
148}
149
150impl UserAction {
151 pub(crate) fn signer(&self) -> Option<AccountOwner> {
152 match self {
153 UserAction::Instantiate(context, _) => context.authenticated_signer,
154 UserAction::Operation(context, _) => context.authenticated_signer,
155 UserAction::ProcessStreams(_, _) => None,
156 UserAction::Message(context, _) => context.authenticated_signer,
157 }
158 }
159
160 pub(crate) fn height(&self) -> BlockHeight {
161 match self {
162 UserAction::Instantiate(context, _) => context.height,
163 UserAction::Operation(context, _) => context.height,
164 UserAction::ProcessStreams(context, _) => context.height,
165 UserAction::Message(context, _) => context.height,
166 }
167 }
168
169 pub(crate) fn round(&self) -> Option<u32> {
170 match self {
171 UserAction::Instantiate(context, _) => context.round,
172 UserAction::Operation(context, _) => context.round,
173 UserAction::ProcessStreams(context, _) => context.round,
174 UserAction::Message(context, _) => context.round,
175 }
176 }
177
178 pub(crate) fn timestamp(&self) -> Timestamp {
179 match self {
180 UserAction::Instantiate(context, _) => context.timestamp,
181 UserAction::Operation(context, _) => context.timestamp,
182 UserAction::ProcessStreams(context, _) => context.timestamp,
183 UserAction::Message(context, _) => context.timestamp,
184 }
185 }
186}
187
188impl<C> ExecutionStateView<C>
189where
190 C: Context + Clone + Send + Sync + 'static,
191 C::Extra: ExecutionRuntimeContext,
192{
193 pub async fn query_application(
194 &mut self,
195 context: QueryContext,
196 query: Query,
197 endpoint: Option<&mut ServiceRuntimeEndpoint>,
198 ) -> Result<QueryOutcome, ExecutionError> {
199 assert_eq!(context.chain_id, self.context().extra().chain_id());
200 match query {
201 Query::System(query) => {
202 let outcome = self.system.handle_query(context, query).await?;
203 Ok(outcome.into())
204 }
205 Query::User {
206 application_id,
207 bytes,
208 } => {
209 let ExecutionRuntimeConfig {} = self.context().extra().execution_runtime_config();
210 let outcome = match endpoint {
211 Some(endpoint) => {
212 self.query_user_application_with_long_lived_service(
213 application_id,
214 context,
215 bytes,
216 &mut endpoint.incoming_execution_requests,
217 &mut endpoint.runtime_request_sender,
218 )
219 .await?
220 }
221 None => {
222 self.query_user_application(application_id, context, bytes)
223 .await?
224 }
225 };
226 Ok(outcome.into())
227 }
228 }
229 }
230
231 async fn query_user_application(
232 &mut self,
233 application_id: ApplicationId,
234 context: QueryContext,
235 query: Vec<u8>,
236 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
237 self.query_user_application_with_deadline(
238 application_id,
239 context,
240 query,
241 None,
242 BTreeMap::new(),
243 )
244 .await
245 }
246
247 pub(crate) async fn query_user_application_with_deadline(
248 &mut self,
249 application_id: ApplicationId,
250 context: QueryContext,
251 query: Vec<u8>,
252 deadline: Option<Instant>,
253 created_blobs: BTreeMap<BlobId, BlobContent>,
254 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
255 let (execution_state_sender, mut execution_state_receiver) =
256 futures::channel::mpsc::unbounded();
257 let mut txn_tracker = TransactionTracker::default().with_blobs(created_blobs);
258 let mut resource_controller = ResourceController::default();
259 let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
260 let (code, description) = actor.load_service(application_id).await?;
261
262 let service_runtime_task = linera_base::task::Blocking::spawn(move |mut codes| {
263 let mut runtime =
264 ServiceSyncRuntime::new_with_deadline(execution_state_sender, context, deadline);
265
266 async move {
267 let code = codes.next().await.expect("we send this immediately below");
268 runtime.preload_service(application_id, code, description)?;
269 runtime.run_query(application_id, query)
270 }
271 })
272 .await;
273
274 service_runtime_task.send(code)?;
275
276 while let Some(request) = execution_state_receiver.next().await {
277 actor.handle_request(request).await?;
278 }
279
280 service_runtime_task.join().await
281 }
282
283 async fn query_user_application_with_long_lived_service(
284 &mut self,
285 application_id: ApplicationId,
286 context: QueryContext,
287 query: Vec<u8>,
288 incoming_execution_requests: &mut futures::channel::mpsc::UnboundedReceiver<
289 ExecutionRequest,
290 >,
291 runtime_request_sender: &mut std::sync::mpsc::Sender<ServiceRuntimeRequest>,
292 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
293 let (outcome_sender, outcome_receiver) = oneshot::channel();
294 let mut outcome_receiver = outcome_receiver.fuse();
295
296 runtime_request_sender
297 .send(ServiceRuntimeRequest::Query {
298 application_id,
299 context,
300 query,
301 callback: outcome_sender,
302 })
303 .expect("Service runtime thread should only stop when `request_sender` is dropped");
304
305 let mut txn_tracker = TransactionTracker::default();
306 let mut resource_controller = ResourceController::default();
307 let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
308
309 loop {
310 futures::select! {
311 maybe_request = incoming_execution_requests.next() => {
312 if let Some(request) = maybe_request {
313 actor.handle_request(request).await?;
314 }
315 }
316 outcome = &mut outcome_receiver => {
317 return outcome.map_err(|_| ExecutionError::MissingRuntimeResponse)?;
318 }
319 }
320 }
321 }
322
323 pub async fn list_applications(
324 &self,
325 ) -> Result<Vec<(ApplicationId, ApplicationDescription)>, ExecutionError> {
326 let mut applications = vec![];
327 for app_id in self.users.indices().await? {
328 let blob_id = app_id.description_blob_id();
329 let blob_content = self.system.read_blob_content(blob_id).await?;
330 let application_description = bcs::from_bytes(blob_content.bytes())?;
331 applications.push((app_id, application_description));
332 }
333 Ok(applications)
334 }
335}