exocore_apps_host/runtime/
apps.rs1use std::{path::PathBuf, sync::Arc, time::Duration};
2
3use anyhow::anyhow;
4use exocore_core::{
5 cell::Cell,
6 futures::{block_on, owned_spawn, sleep, spawn_blocking, spawn_future, BatchingStream},
7 time::Clock,
8 utils::backoff::BackoffCalculator,
9};
10use exocore_protos::{
11 apps::{in_message::InMessageType, out_message::OutMessageType, InMessage, OutMessage},
12 prost::Message,
13 store::{EntityQuery, MutationRequest},
14};
15use exocore_store::store::Store;
16use futures::{
17 channel::mpsc,
18 future::{pending, select_all, FutureExt},
19 lock::Mutex,
20 Future, SinkExt, StreamExt,
21};
22
23use super::wasmtime::WasmTimeRuntime;
24use crate::{Config, Error};
25
26const MSG_BUFFER_SIZE: usize = 5000;
27const RUNTIME_MSG_BATCH_SIZE: usize = 1000;
28const APP_MIN_TICK_TIME: Duration = Duration::from_millis(100);
29
30pub struct Applications<S: Store> {
36 config: Config,
37 cell: Cell,
38 clock: Clock,
39 store: S,
40 apps: Vec<Application>,
41}
42
43impl<S: Store> Applications<S> {
44 pub async fn new(
45 config: Config,
46 clock: Clock,
47 cell: Cell,
48 store: S,
49 ) -> Result<Applications<S>, Error> {
50 let mut apps = Vec::new();
51 for cell_app in cell.applications().get() {
52 let Some(app) = cell_app.get() else {
53 warn!(
54 "Application '{}' (id={}) not loaded. Run unpack to load them.",
55 cell_app.name(),
56 cell_app.id()
57 );
58 continue;
59 };
60
61 let app_manifest = app.manifest();
62 let Some(module) = &app_manifest.module else {
63 continue;
64 };
65
66 let app_dir = app.directory();
67 let module_path = app_dir
68 .as_os_path()
69 .map_err(|err| anyhow!("module file is not accessible via os fs: {}", err))?
70 .join(&module.file);
71
72 let app = Application {
73 cell: cell.clone(),
74 cell_app: app.clone(),
75 module_path,
76 };
77 app.cell_app
78 .validate()
79 .map_err(|err| anyhow!("Couldn't validate module: {}", err))?;
80
81 apps.push(app);
82 }
83
84 Ok(Applications {
85 config,
86 cell,
87 clock,
88 store,
89 apps,
90 })
91 }
92
93 pub async fn run(self) -> Result<(), Error> {
95 if self.apps.is_empty() {
96 info!("{}: No apps to start. Blocking forever.", self.cell);
97 pending::<()>().await;
98 return Ok(());
99 }
100
101 let mut spawned_apps = Vec::new();
102 for app in self.apps {
103 spawned_apps.push(owned_spawn(Self::start_app_loop(
104 self.clock.clone(),
105 self.config,
106 app,
107 self.store.clone(),
108 )));
109 }
110
111 let _ = select_all(spawned_apps).await;
113
114 Ok(())
115 }
116
117 async fn start_app_loop(clock: Clock, config: Config, app: Application, store: S) {
118 let mut backoff = BackoffCalculator::new(clock, config.restart_backoff);
119 loop {
120 info!(
121 "{}: Starting application (version {})",
122 app,
123 app.cell_app.version()
124 );
125
126 let store = store.clone();
127 Self::start_app(&app, store).await;
128
129 backoff.increment_failure();
130
131 let restart_delay = backoff.backoff_duration();
132 error!(
133 "{}: Application has quit. Restarting in {:?}...",
134 app, restart_delay
135 );
136 sleep(restart_delay).await;
137 }
138 }
139
140 async fn start_app(app: &Application, store: S) {
141 let (in_sender, in_receiver) = mpsc::channel(MSG_BUFFER_SIZE);
142 let (out_sender, mut out_receiver) = mpsc::channel(MSG_BUFFER_SIZE);
143
144 let runtime_spawn = {
146 let env = Arc::new(WiredEnvironment {
147 log_prefix: app.to_string(),
148 sender: std::sync::Mutex::new(out_sender),
149 });
150
151 let app_module_path = app.module_path.clone();
152 let app_prefix = app.to_string();
153 spawn_blocking(move || -> Result<(), Error> {
154 let mut app_runtime = WasmTimeRuntime::from_file(app_module_path, env)?;
155 let mut batch_receiver = BatchingStream::new(in_receiver, RUNTIME_MSG_BATCH_SIZE);
156
157 let mut started = false;
158 let mut next_tick = sleep(APP_MIN_TICK_TIME);
159 loop {
160 let in_messages: Option<Vec<InMessage>> = block_on(async {
161 futures::select! {
162 _ = next_tick.fuse() => Some(vec![]),
163 msgs = batch_receiver.next().fuse() => msgs,
164 }
165 });
166
167 let Some(in_messages) = in_messages else {
168 info!(
169 "{}: In message receiver returned none. Stopping app runtime",
170 app_prefix
171 );
172 return Ok(());
173 };
174 let in_messages_count = in_messages.len();
175
176 for in_message in in_messages {
177 app_runtime.send_message(in_message)?;
178 }
179
180 let next_tick_duration = app_runtime.tick()?.unwrap_or(APP_MIN_TICK_TIME);
181 next_tick = sleep(next_tick_duration);
182
183 debug!(
184 "{}: App ticked. {} incoming message, next tick in {:?}",
185 app_prefix, in_messages_count, next_tick_duration
186 );
187
188 if !started {
189 info!("{}: Application started", app_prefix);
190 started = true;
191 }
192 }
193 })
194 };
195
196 let store_worker = {
198 let store = store.clone();
199 let app_prefix = app.to_string();
200 async move {
201 let in_sender = Arc::new(Mutex::new(in_sender));
202 while let Some(message) = out_receiver.next().await {
203 match OutMessageType::try_from(message.r#type) {
204 Ok(OutMessageType::StoreEntityQuery) => {
205 let store = store.clone();
206 handle_store_message(
207 message.rendez_vous_id,
208 InMessageType::StoreEntityResults,
209 in_sender.clone(),
210 move || handle_entity_query(message, store),
211 )
212 }
213 Ok(OutMessageType::StoreMutationRequest) => {
214 let store = store.clone();
215 handle_store_message(
216 message.rendez_vous_id,
217 InMessageType::StoreMutationResult,
218 in_sender.clone(),
219 move || handle_entity_mutation(message, store),
220 )
221 }
222 other => {
223 error!(
224 "{}: Got an unknown message type {:?} with id {}",
225 app_prefix, other, message.r#type
226 );
227 }
228 }
229 }
230
231 Ok::<(), Error>(())
232 }
233 };
234
235 futures::select! {
236 res = runtime_spawn.fuse() => {
237 info!("{}: App runtime spawn has stopped: {:?}", app, res);
238 }
239 _ = store_worker.fuse() => {
240 info!("{}: Store worker task has stopped", app);
241 }
242 };
243 }
244}
245
246fn handle_store_message<F, O>(
247 rendez_vous_id: u32,
248 reply_type: InMessageType,
249 in_sender: Arc<Mutex<mpsc::Sender<InMessage>>>,
250 func: F,
251) where
252 F: (FnOnce() -> O) + Send + 'static,
253 O: Future<Output = Result<Vec<u8>, Error>> + Send + 'static,
254{
255 spawn_future(async move {
256 let mut msg = InMessage {
257 r#type: reply_type.into(),
258 rendez_vous_id,
259 ..Default::default()
260 };
261
262 let res = func();
263 match res.await {
264 Ok(res) => msg.data = res,
265 Err(err) => msg.error = err.to_string(),
266 }
267
268 let mut in_sender = in_sender.lock().await;
269 let _ = in_sender.send(msg).await;
270 });
271}
272
273async fn handle_entity_query<S: Store>(
274 out_message: OutMessage,
275 store: S,
276) -> Result<Vec<u8>, Error> {
277 let query = EntityQuery::decode(out_message.data.as_ref())?;
278 let res = store.query(query);
279 let res = res.await?;
280
281 Ok(res.encode_to_vec())
282}
283
284async fn handle_entity_mutation<S: Store>(
285 out_message: OutMessage,
286 store: S,
287) -> Result<Vec<u8>, Error> {
288 let mutation = MutationRequest::decode(out_message.data.as_ref())?;
289 let res = store.mutate(mutation);
290 let res = res.await?;
291
292 Ok(res.encode_to_vec())
293}
294
295struct WiredEnvironment {
296 log_prefix: String,
297 sender: std::sync::Mutex<mpsc::Sender<exocore_protos::apps::OutMessage>>,
298}
299
300impl super::wasmtime::HostEnvironment for WiredEnvironment {
301 fn handle_message(&self, msg: exocore_protos::apps::OutMessage) {
302 let mut sender = self.sender.lock().unwrap();
303 if let Err(err) = sender.try_send(msg) {
304 error!("Couldn't send message via channel: {}", err)
305 }
306 }
307
308 fn handle_log(&self, level: log::Level, msg: &str) {
309 log!(level, "{}: WASM: {}", self.log_prefix, msg);
310 }
311}
312
313struct Application {
314 cell: Cell,
315 cell_app: exocore_core::cell::Application,
316 module_path: PathBuf,
317}
318
319impl std::fmt::Display for Application {
320 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
321 write!(f, "{} App{{{}}}", self.cell, self.cell_app.name())
322 }
323}