exocore_apps_host/runtime/
apps.rs

1use std::{path::PathBuf, sync::Arc, time::Duration};
2
3use anyhow::anyhow;
4use exocore_core::{
5    cell::Cell,
6    futures::{block_on, owned_spawn, sleep, spawn_blocking, spawn_future, BatchingStream},
7    time::Clock,
8    utils::backoff::BackoffCalculator,
9};
10use exocore_protos::{
11    apps::{in_message::InMessageType, out_message::OutMessageType, InMessage, OutMessage},
12    prost::Message,
13    store::{EntityQuery, MutationRequest},
14};
15use exocore_store::store::Store;
16use futures::{
17    channel::mpsc,
18    future::{pending, select_all, FutureExt},
19    lock::Mutex,
20    Future, SinkExt, StreamExt,
21};
22
23use super::wasmtime::WasmTimeRuntime;
24use crate::{Config, Error};
25
26const MSG_BUFFER_SIZE: usize = 5000;
27const RUNTIME_MSG_BATCH_SIZE: usize = 1000;
28const APP_MIN_TICK_TIME: Duration = Duration::from_millis(100);
29
30/// Exocore applications host.
31///
32/// Executes applications that have a WASM module in a background thread per
33/// applications and handles incoming and outgoing messages to the module for
34/// store and communication.
35pub struct Applications<S: Store> {
36    config: Config,
37    cell: Cell,
38    clock: Clock,
39    store: S,
40    apps: Vec<Application>,
41}
42
43impl<S: Store> Applications<S> {
44    pub async fn new(
45        config: Config,
46        clock: Clock,
47        cell: Cell,
48        store: S,
49    ) -> Result<Applications<S>, Error> {
50        let mut apps = Vec::new();
51        for cell_app in cell.applications().get() {
52            let Some(app) = cell_app.get() else {
53                warn!(
54                    "Application '{}' (id={}) not loaded. Run unpack to load them.",
55                    cell_app.name(),
56                    cell_app.id()
57                );
58                continue;
59            };
60
61            let app_manifest = app.manifest();
62            let Some(module) = &app_manifest.module else {
63                continue;
64            };
65
66            let app_dir = app.directory();
67            let module_path = app_dir
68                .as_os_path()
69                .map_err(|err| anyhow!("module file is not accessible via os fs: {}", err))?
70                .join(&module.file);
71
72            let app = Application {
73                cell: cell.clone(),
74                cell_app: app.clone(),
75                module_path,
76            };
77            app.cell_app
78                .validate()
79                .map_err(|err| anyhow!("Couldn't validate module: {}", err))?;
80
81            apps.push(app);
82        }
83
84        Ok(Applications {
85            config,
86            cell,
87            clock,
88            store,
89            apps,
90        })
91    }
92
93    /// Starts and runs applications.
94    pub async fn run(self) -> Result<(), Error> {
95        if self.apps.is_empty() {
96            info!("{}: No apps to start. Blocking forever.", self.cell);
97            pending::<()>().await;
98            return Ok(());
99        }
100
101        let mut spawned_apps = Vec::new();
102        for app in self.apps {
103            spawned_apps.push(owned_spawn(Self::start_app_loop(
104                self.clock.clone(),
105                self.config,
106                app,
107                self.store.clone(),
108            )));
109        }
110
111        // wait for any applications to terminate
112        let _ = select_all(spawned_apps).await;
113
114        Ok(())
115    }
116
117    async fn start_app_loop(clock: Clock, config: Config, app: Application, store: S) {
118        let mut backoff = BackoffCalculator::new(clock, config.restart_backoff);
119        loop {
120            info!(
121                "{}: Starting application (version {})",
122                app,
123                app.cell_app.version()
124            );
125
126            let store = store.clone();
127            Self::start_app(&app, store).await;
128
129            backoff.increment_failure();
130
131            let restart_delay = backoff.backoff_duration();
132            error!(
133                "{}: Application has quit. Restarting in {:?}...",
134                app, restart_delay
135            );
136            sleep(restart_delay).await;
137        }
138    }
139
140    async fn start_app(app: &Application, store: S) {
141        let (in_sender, in_receiver) = mpsc::channel(MSG_BUFFER_SIZE);
142        let (out_sender, mut out_receiver) = mpsc::channel(MSG_BUFFER_SIZE);
143
144        // Spawn the application module runtime on a separate thread.
145        let runtime_spawn = {
146            let env = Arc::new(WiredEnvironment {
147                log_prefix: app.to_string(),
148                sender: std::sync::Mutex::new(out_sender),
149            });
150
151            let app_module_path = app.module_path.clone();
152            let app_prefix = app.to_string();
153            spawn_blocking(move || -> Result<(), Error> {
154                let mut app_runtime = WasmTimeRuntime::from_file(app_module_path, env)?;
155                let mut batch_receiver = BatchingStream::new(in_receiver, RUNTIME_MSG_BATCH_SIZE);
156
157                let mut started = false;
158                let mut next_tick = sleep(APP_MIN_TICK_TIME);
159                loop {
160                    let in_messages: Option<Vec<InMessage>> = block_on(async {
161                        futures::select! {
162                            _ = next_tick.fuse() => Some(vec![]),
163                            msgs = batch_receiver.next().fuse() => msgs,
164                        }
165                    });
166
167                    let Some(in_messages) = in_messages else {
168                        info!(
169                            "{}: In message receiver returned none. Stopping app runtime",
170                            app_prefix
171                        );
172                        return Ok(());
173                    };
174                    let in_messages_count = in_messages.len();
175
176                    for in_message in in_messages {
177                        app_runtime.send_message(in_message)?;
178                    }
179
180                    let next_tick_duration = app_runtime.tick()?.unwrap_or(APP_MIN_TICK_TIME);
181                    next_tick = sleep(next_tick_duration);
182
183                    debug!(
184                        "{}: App ticked. {} incoming message, next tick in {:?}",
185                        app_prefix, in_messages_count, next_tick_duration
186                    );
187
188                    if !started {
189                        info!("{}: Application started", app_prefix);
190                        started = true;
191                    }
192                }
193            })
194        };
195
196        // Spawn a task to handle store requests coming from the application
197        let store_worker = {
198            let store = store.clone();
199            let app_prefix = app.to_string();
200            async move {
201                let in_sender = Arc::new(Mutex::new(in_sender));
202                while let Some(message) = out_receiver.next().await {
203                    match OutMessageType::try_from(message.r#type) {
204                        Ok(OutMessageType::StoreEntityQuery) => {
205                            let store = store.clone();
206                            handle_store_message(
207                                message.rendez_vous_id,
208                                InMessageType::StoreEntityResults,
209                                in_sender.clone(),
210                                move || handle_entity_query(message, store),
211                            )
212                        }
213                        Ok(OutMessageType::StoreMutationRequest) => {
214                            let store = store.clone();
215                            handle_store_message(
216                                message.rendez_vous_id,
217                                InMessageType::StoreMutationResult,
218                                in_sender.clone(),
219                                move || handle_entity_mutation(message, store),
220                            )
221                        }
222                        other => {
223                            error!(
224                                "{}: Got an unknown message type {:?} with id {}",
225                                app_prefix, other, message.r#type
226                            );
227                        }
228                    }
229                }
230
231                Ok::<(), Error>(())
232            }
233        };
234
235        futures::select! {
236            res = runtime_spawn.fuse() => {
237                info!("{}: App runtime spawn has stopped: {:?}", app, res);
238            }
239            _ = store_worker.fuse() => {
240                info!("{}: Store worker task has stopped", app);
241            }
242        };
243    }
244}
245
246fn handle_store_message<F, O>(
247    rendez_vous_id: u32,
248    reply_type: InMessageType,
249    in_sender: Arc<Mutex<mpsc::Sender<InMessage>>>,
250    func: F,
251) where
252    F: (FnOnce() -> O) + Send + 'static,
253    O: Future<Output = Result<Vec<u8>, Error>> + Send + 'static,
254{
255    spawn_future(async move {
256        let mut msg = InMessage {
257            r#type: reply_type.into(),
258            rendez_vous_id,
259            ..Default::default()
260        };
261
262        let res = func();
263        match res.await {
264            Ok(res) => msg.data = res,
265            Err(err) => msg.error = err.to_string(),
266        }
267
268        let mut in_sender = in_sender.lock().await;
269        let _ = in_sender.send(msg).await;
270    });
271}
272
273async fn handle_entity_query<S: Store>(
274    out_message: OutMessage,
275    store: S,
276) -> Result<Vec<u8>, Error> {
277    let query = EntityQuery::decode(out_message.data.as_ref())?;
278    let res = store.query(query);
279    let res = res.await?;
280
281    Ok(res.encode_to_vec())
282}
283
284async fn handle_entity_mutation<S: Store>(
285    out_message: OutMessage,
286    store: S,
287) -> Result<Vec<u8>, Error> {
288    let mutation = MutationRequest::decode(out_message.data.as_ref())?;
289    let res = store.mutate(mutation);
290    let res = res.await?;
291
292    Ok(res.encode_to_vec())
293}
294
295struct WiredEnvironment {
296    log_prefix: String,
297    sender: std::sync::Mutex<mpsc::Sender<exocore_protos::apps::OutMessage>>,
298}
299
300impl super::wasmtime::HostEnvironment for WiredEnvironment {
301    fn handle_message(&self, msg: exocore_protos::apps::OutMessage) {
302        let mut sender = self.sender.lock().unwrap();
303        if let Err(err) = sender.try_send(msg) {
304            error!("Couldn't send message via channel: {}", err)
305        }
306    }
307
308    fn handle_log(&self, level: log::Level, msg: &str) {
309        log!(level, "{}: WASM: {}", self.log_prefix, msg);
310    }
311}
312
313struct Application {
314    cell: Cell,
315    cell_app: exocore_core::cell::Application,
316    module_path: PathBuf,
317}
318
319impl std::fmt::Display for Application {
320    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
321        write!(f, "{} App{{{}}}", self.cell, self.cell_app.name())
322    }
323}