1use std::{collections::BTreeMap, vec};
5
6use futures::{FutureExt, StreamExt};
7use linera_base::{
8    data_types::{BlobContent, BlockHeight, StreamUpdate},
9    identifiers::{AccountOwner, BlobId, StreamId},
10    time::Instant,
11};
12use linera_views::{
13    context::Context,
14    key_value_store_view::KeyValueStoreView,
15    map_view::MapView,
16    reentrant_collection_view::HashedReentrantCollectionView,
17    views::{ClonableView, ReplaceContext, View},
18};
19use linera_views_derive::CryptoHashView;
20#[cfg(with_testing)]
21use {
22    crate::{
23        ResourceControlPolicy, ResourceTracker, TestExecutionRuntimeContext, UserContractCode,
24    },
25    linera_base::data_types::Blob,
26    linera_views::context::MemoryContext,
27    std::sync::Arc,
28};
29
30use super::{execution_state_actor::ExecutionRequest, runtime::ServiceRuntimeRequest};
31use crate::{
32    execution_state_actor::ExecutionStateActor, resources::ResourceController,
33    system::SystemExecutionStateView, ApplicationDescription, ApplicationId, ExecutionError,
34    ExecutionRuntimeConfig, ExecutionRuntimeContext, MessageContext, OperationContext,
35    ProcessStreamsContext, Query, QueryContext, QueryOutcome, ServiceSyncRuntime, Timestamp,
36    TransactionTracker,
37};
38
39#[derive(Debug, ClonableView, CryptoHashView)]
41pub struct ExecutionStateView<C> {
42    pub system: SystemExecutionStateView<C>,
44    pub users: HashedReentrantCollectionView<C, ApplicationId, KeyValueStoreView<C>>,
46    pub stream_event_counts: MapView<C, StreamId, u32>,
48}
49
50impl<C: Context, C2: Context> ReplaceContext<C2> for ExecutionStateView<C> {
51    type Target = ExecutionStateView<C2>;
52
53    async fn with_context(
54        &mut self,
55        ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
56    ) -> Self::Target {
57        ExecutionStateView {
58            system: self.system.with_context(ctx.clone()).await,
59            users: self.users.with_context(ctx.clone()).await,
60            stream_event_counts: self.stream_event_counts.with_context(ctx.clone()).await,
61        }
62    }
63}
64
65pub struct ServiceRuntimeEndpoint {
67    pub incoming_execution_requests: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
69    pub runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
71}
72
73#[cfg(with_testing)]
74impl ExecutionStateView<MemoryContext<TestExecutionRuntimeContext>>
75where
76    MemoryContext<TestExecutionRuntimeContext>: Context + Clone + Send + Sync + 'static,
77{
78    pub async fn simulate_instantiation(
80        &mut self,
81        contract: UserContractCode,
82        local_time: linera_base::data_types::Timestamp,
83        application_description: ApplicationDescription,
84        instantiation_argument: Vec<u8>,
85        contract_blob: Blob,
86        service_blob: Blob,
87    ) -> Result<(), ExecutionError> {
88        let chain_id = application_description.creator_chain_id;
89        assert_eq!(chain_id, self.context().extra().chain_id);
90        let context = OperationContext {
91            chain_id,
92            authenticated_signer: None,
93            height: application_description.block_height,
94            round: None,
95            timestamp: local_time,
96        };
97
98        let action = UserAction::Instantiate(context, instantiation_argument);
99        let next_application_index = application_description.application_index + 1;
100        let next_chain_index = 0;
101
102        let application_id = From::from(&application_description);
103        let blob = Blob::new_application_description(&application_description);
104
105        self.system.used_blobs.insert(&blob.id())?;
106        self.system.used_blobs.insert(&contract_blob.id())?;
107        self.system.used_blobs.insert(&service_blob.id())?;
108
109        self.context()
110            .extra()
111            .user_contracts()
112            .insert(application_id, contract);
113
114        self.context()
115            .extra()
116            .add_blobs([
117                contract_blob,
118                service_blob,
119                Blob::new_application_description(&application_description),
120            ])
121            .await?;
122
123        let tracker = ResourceTracker::default();
124        let policy = ResourceControlPolicy::no_fees();
125        let mut resource_controller = ResourceController::new(Arc::new(policy), tracker, None);
126        let mut txn_tracker = TransactionTracker::new(
127            local_time,
128            0,
129            next_application_index,
130            next_chain_index,
131            None,
132            &[],
133        );
134        txn_tracker.add_created_blob(blob);
135        ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller)
136            .run_user_action(application_id, action, context.refund_grant_to(), None)
137            .await?;
138
139        Ok(())
140    }
141}
142
143pub enum UserAction {
144    Instantiate(OperationContext, Vec<u8>),
145    Operation(OperationContext, Vec<u8>),
146    Message(MessageContext, Vec<u8>),
147    ProcessStreams(ProcessStreamsContext, Vec<StreamUpdate>),
148}
149
150impl UserAction {
151    pub(crate) fn signer(&self) -> Option<AccountOwner> {
152        match self {
153            UserAction::Instantiate(context, _) => context.authenticated_signer,
154            UserAction::Operation(context, _) => context.authenticated_signer,
155            UserAction::ProcessStreams(_, _) => None,
156            UserAction::Message(context, _) => context.authenticated_signer,
157        }
158    }
159
160    pub(crate) fn height(&self) -> BlockHeight {
161        match self {
162            UserAction::Instantiate(context, _) => context.height,
163            UserAction::Operation(context, _) => context.height,
164            UserAction::ProcessStreams(context, _) => context.height,
165            UserAction::Message(context, _) => context.height,
166        }
167    }
168
169    pub(crate) fn round(&self) -> Option<u32> {
170        match self {
171            UserAction::Instantiate(context, _) => context.round,
172            UserAction::Operation(context, _) => context.round,
173            UserAction::ProcessStreams(context, _) => context.round,
174            UserAction::Message(context, _) => context.round,
175        }
176    }
177
178    pub(crate) fn timestamp(&self) -> Timestamp {
179        match self {
180            UserAction::Instantiate(context, _) => context.timestamp,
181            UserAction::Operation(context, _) => context.timestamp,
182            UserAction::ProcessStreams(context, _) => context.timestamp,
183            UserAction::Message(context, _) => context.timestamp,
184        }
185    }
186}
187
188impl<C> ExecutionStateView<C>
189where
190    C: Context + Clone + Send + Sync + 'static,
191    C::Extra: ExecutionRuntimeContext,
192{
193    pub async fn query_application(
194        &mut self,
195        context: QueryContext,
196        query: Query,
197        endpoint: Option<&mut ServiceRuntimeEndpoint>,
198    ) -> Result<QueryOutcome, ExecutionError> {
199        assert_eq!(context.chain_id, self.context().extra().chain_id());
200        match query {
201            Query::System(query) => {
202                let outcome = self.system.handle_query(context, query).await?;
203                Ok(outcome.into())
204            }
205            Query::User {
206                application_id,
207                bytes,
208            } => {
209                let ExecutionRuntimeConfig {} = self.context().extra().execution_runtime_config();
210                let outcome = match endpoint {
211                    Some(endpoint) => {
212                        self.query_user_application_with_long_lived_service(
213                            application_id,
214                            context,
215                            bytes,
216                            &mut endpoint.incoming_execution_requests,
217                            &mut endpoint.runtime_request_sender,
218                        )
219                        .await?
220                    }
221                    None => {
222                        self.query_user_application(application_id, context, bytes)
223                            .await?
224                    }
225                };
226                Ok(outcome.into())
227            }
228        }
229    }
230
231    async fn query_user_application(
232        &mut self,
233        application_id: ApplicationId,
234        context: QueryContext,
235        query: Vec<u8>,
236    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
237        self.query_user_application_with_deadline(
238            application_id,
239            context,
240            query,
241            None,
242            BTreeMap::new(),
243        )
244        .await
245    }
246
247    pub(crate) async fn query_user_application_with_deadline(
248        &mut self,
249        application_id: ApplicationId,
250        context: QueryContext,
251        query: Vec<u8>,
252        deadline: Option<Instant>,
253        created_blobs: BTreeMap<BlobId, BlobContent>,
254    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
255        let (execution_state_sender, mut execution_state_receiver) =
256            futures::channel::mpsc::unbounded();
257        let mut txn_tracker = TransactionTracker::default().with_blobs(created_blobs);
258        let mut resource_controller = ResourceController::default();
259        let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
260        let (code, description) = actor.load_service(application_id).await?;
261
262        let service_runtime_task = linera_base::task::Blocking::spawn(move |mut codes| {
263            let mut runtime =
264                ServiceSyncRuntime::new_with_deadline(execution_state_sender, context, deadline);
265
266            async move {
267                let code = codes.next().await.expect("we send this immediately below");
268                runtime.preload_service(application_id, code, description)?;
269                runtime.run_query(application_id, query)
270            }
271        })
272        .await;
273
274        service_runtime_task.send(code)?;
275
276        while let Some(request) = execution_state_receiver.next().await {
277            actor.handle_request(request).await?;
278        }
279
280        service_runtime_task.join().await
281    }
282
283    async fn query_user_application_with_long_lived_service(
284        &mut self,
285        application_id: ApplicationId,
286        context: QueryContext,
287        query: Vec<u8>,
288        incoming_execution_requests: &mut futures::channel::mpsc::UnboundedReceiver<
289            ExecutionRequest,
290        >,
291        runtime_request_sender: &mut std::sync::mpsc::Sender<ServiceRuntimeRequest>,
292    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
293        let (outcome_sender, outcome_receiver) = oneshot::channel();
294        let mut outcome_receiver = outcome_receiver.fuse();
295
296        runtime_request_sender
297            .send(ServiceRuntimeRequest::Query {
298                application_id,
299                context,
300                query,
301                callback: outcome_sender,
302            })
303            .expect("Service runtime thread should only stop when `request_sender` is dropped");
304
305        let mut txn_tracker = TransactionTracker::default();
306        let mut resource_controller = ResourceController::default();
307        let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
308
309        loop {
310            futures::select! {
311                maybe_request = incoming_execution_requests.next() => {
312                    if let Some(request) = maybe_request {
313                        actor.handle_request(request).await?;
314                    }
315                }
316                outcome = &mut outcome_receiver => {
317                    return outcome.map_err(|_| ExecutionError::MissingRuntimeResponse)?;
318                }
319            }
320        }
321    }
322
323    pub async fn list_applications(
324        &self,
325    ) -> Result<Vec<(ApplicationId, ApplicationDescription)>, ExecutionError> {
326        let mut applications = vec![];
327        for app_id in self.users.indices().await? {
328            let blob_id = app_id.description_blob_id();
329            let blob_content = self.system.read_blob_content(blob_id).await?;
330            let application_description = bcs::from_bytes(blob_content.bytes())?;
331            applications.push((app_id, application_description));
332        }
333        Ok(applications)
334    }
335}