use crate::database_thread;
use futures_channel::oneshot;
use futures_lite::{Future, StreamExt as _};
use smol::lock::Mutex;
use smoldot::{executor, trie};
use std::{
iter,
num::NonZero,
pin::{self, Pin},
sync::Arc,
};
pub struct Config {
pub tasks_executor: Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
pub database: Arc<database_thread::DatabaseThread>,
pub num_cache_entries: NonZero<usize>,
}
pub struct RuntimeCachesService {
to_background: Mutex<async_channel::Sender<Message>>,
}
enum Message {
Get {
block_hash: [u8; 32],
result_tx: oneshot::Sender<Result<Arc<executor::host::HostVmPrototype>, GetError>>,
},
}
impl RuntimeCachesService {
pub fn new(config: Config) -> Self {
let (to_background, from_foreground) = async_channel::bounded(16);
(config.tasks_executor)(Box::pin(async move {
let mut from_foreground = pin::pin!(from_foreground);
let mut cache =
lru::LruCache::<[u8; 32], Result<_, GetError>>::new(config.num_cache_entries);
loop {
match from_foreground.next().await {
Some(Message::Get {
block_hash,
result_tx,
}) => {
if let Some(cache_entry) = cache.get(&block_hash) {
let _ = result_tx.send(cache_entry.clone());
continue;
}
let (code, heap_pages) = config
.database
.with_database(move |database| {
let code = database.block_storage_get(
&block_hash,
iter::empty::<iter::Empty<_>>(),
trie::bytes_to_nibbles(b":code".iter().copied()).map(u8::from),
);
let heap_pages = database.block_storage_get(
&block_hash,
iter::empty::<iter::Empty<_>>(),
trie::bytes_to_nibbles(b":heappages".iter().copied())
.map(u8::from),
);
(code, heap_pages)
})
.await;
let runtime = match (code, heap_pages) {
(Ok(Some((code, _))), Ok(heap_pages)) => {
match executor::storage_heap_pages_to_value(
heap_pages.as_ref().map(|(h, _)| &h[..]),
) {
Ok(heap_pages) => executor::host::HostVmPrototype::new(
executor::host::Config {
module: &code,
heap_pages,
exec_hint: executor::vm::ExecHint::ValidateAndCompile,
allow_unresolved_imports: true, },
)
.map_err(GetError::InvalidRuntime),
Err(_) => Err(GetError::InvalidHeapPages),
}
}
(Ok(None), Ok(_)) => Err(GetError::NoCode),
(Err(database_thread::StorageAccessError::UnknownBlock), _)
| (_, Err(database_thread::StorageAccessError::UnknownBlock)) => {
let _ = result_tx.send(Err(GetError::UnknownBlock));
continue;
}
(Err(database_thread::StorageAccessError::IncompleteStorage), _)
| (_, Err(database_thread::StorageAccessError::IncompleteStorage)) => {
let _ = result_tx.send(Err(GetError::Pruned));
continue;
}
(Err(database_thread::StorageAccessError::Corrupted(_)), _)
| (_, Err(database_thread::StorageAccessError::Corrupted(_))) => {
let _ = result_tx.send(Err(GetError::CorruptedDatabase));
continue;
}
};
let runtime = runtime.map(Arc::new);
cache.put(block_hash, runtime.clone());
let _ = result_tx.send(runtime);
}
None => {
return;
}
}
}
}));
RuntimeCachesService {
to_background: Mutex::new(to_background),
}
}
pub async fn get(
&self,
block_hash: [u8; 32],
) -> Result<Arc<executor::host::HostVmPrototype>, GetError> {
let (result_tx, result_rx) = oneshot::channel();
let _ = self
.to_background
.lock()
.await
.send(Message::Get {
block_hash,
result_tx,
})
.await;
result_rx.await.unwrap()
}
}
#[derive(Debug, Clone, derive_more::Display, derive_more::Error)]
pub enum GetError {
UnknownBlock,
Pruned,
NoCode,
InvalidHeapPages,
CorruptedDatabase,
InvalidRuntime(executor::host::NewErr),
}