use super::{package, service, BootstrapService};
use anyhow::anyhow;
use codec::{Decode, Encode};
use corevm_host::{fs as corevm_fs, fs::ReadBlock as _, CoreVmInstruction, ExecEnv, StorageKey};
use corevm_tooling::PreimageReader;
use futures::{future::try_join_all, stream::FuturesUnordered, FutureExt, StreamExt};
use jam_bootstrap_service_common::Instruction;
use jam_std_common::{BlockDesc, Node, NodeExt as _, Service};
use jam_types::{CoreIndex, Hash, HeaderHash, Memo, ServiceId};
use std::sync::Arc;
use tokio::task::spawn_blocking;
mod preimage_iter;
use self::preimage_iter::*;
pub async fn destroy(
node: Arc<dyn Node>,
target: ServiceId,
bootstrap_service_id: ServiceId,
core: CoreIndex,
) -> anyhow::Result<()> {
let BlockDesc { header_hash: head, .. } = node.best_block().await?;
let (corevm, corevm_code) = service::query(node.as_ref(), head, target, "corevm").await?;
let bootstrap = BootstrapService::query(node.clone(), head, bootstrap_service_id).await?;
let exec_env_ref = node
.typed_service_value::<corevm_fs::BlockRef>(head, target, &StorageKey::ExecEnvRef)
.await?;
zombify(node.as_ref(), target, bootstrap_service_id, core, &corevm, &bootstrap).await?;
forget_all_preimages(node, core, &corevm, &bootstrap, corevm_code.len() as u64, exec_env_ref)
.await?;
Ok(())
}
pub async fn preimages(
node: Arc<dyn Node>,
target: ServiceId,
) -> anyhow::Result<Vec<(ServiceId, Hash, u32)>> {
let BlockDesc { header_hash: head, .. } = node.best_block().await?;
let (_corevm, _corevm_code) = service::query(node.as_ref(), head, target, "corevm").await?;
let mut preimages = Vec::new();
if let Some(exec_env_ref) = node
.typed_service_value::<corevm_fs::BlockRef>(head, target, &StorageKey::ExecEnvRef)
.await?
{
fetch_all_preimage_info(node.clone(), head, exec_env_ref, &mut preimages).await?;
}
Ok(preimages)
}
async fn zombify(
node: &dyn Node,
target: ServiceId,
bootstrap_service_id: ServiceId,
core: CoreIndex,
corevm: &Service,
bootstrap: &BootstrapService,
) -> anyhow::Result<()> {
let (package_hash, anchor) = bootstrap
.submit_instructions(
vec![Instruction::Transfer {
destination: target,
amount: 0,
gas_limit: corevm.min_memo_gas,
memo: {
let mut memo = Memo::zero();
let ejector = bootstrap_service_id;
CoreVmInstruction::Destroy { ejector }.encode_to(&mut &mut memo[..]);
memo
},
}],
core,
)
.await?;
try_join_all([
wait_until_storage_is_cleared(node, target).boxed(),
package::wait_until_ready(node, anchor, package_hash)
.map(|result| result.map(|_| ()))
.boxed(),
])
.await?;
Ok(())
}
async fn forget_all_preimages(
node: Arc<dyn Node>,
core: CoreIndex,
corevm: &Service,
bootstrap: &BootstrapService,
corevm_code_len: u64,
exec_env_ref: Option<corevm_fs::BlockRef>,
) -> anyhow::Result<()> {
let BlockDesc { header_hash: head, .. } = node.best_block().await?;
let mut instructions = Vec::new();
instructions
.push(Instruction::Forget { hash: corevm.code_hash.0.into(), len: corevm_code_len });
if let Some(exec_env_ref) = exec_env_ref {
let mut preimages = Vec::new();
fetch_all_preimage_info(node.clone(), head, exec_env_ref, &mut preimages).await?;
instructions.extend(preimages.into_iter().filter_map(|(service_id, hash, len)| {
(service_id == bootstrap.id())
.then_some(Instruction::Forget { hash: hash.into(), len: len.into() })
}));
};
let packages = FuturesUnordered::new();
let mut instructions = instructions.into_iter();
loop {
let chunk: Vec<_> = instructions.by_ref().take(50).collect();
if chunk.is_empty() {
break;
}
let (package_hash, anchor) = bootstrap.submit_instructions(chunk, core).await?;
packages.push(
package::wait_until_ready(node.as_ref(), anchor, package_hash)
.map(move |result| (result, package_hash)),
);
}
package::wait_until_all_ready(packages).await
}
async fn fetch_all_preimage_info(
node: Arc<dyn Node>,
head: HeaderHash,
exec_env_ref: corevm_fs::BlockRef,
preimages: &mut Vec<(ServiceId, Hash, u32)>,
) -> anyhow::Result<()> {
let mut reader = PreimageReader::new(node.clone(), head);
let partial_data: Vec<PreimageParams> = spawn_blocking(move || {
let mut partial_data = Vec::new();
let main_block = {
let block = reader.read_block(&exec_env_ref)?;
{
let corevm_fs::BlockRef { service_id, hash } = exec_env_ref;
let params =
PreimageParams { service_id, hash: hash.0, length: Some(block.len() as u32) };
partial_data.push(params);
}
let main_block = corevm_fs::MainBlock::decode(block)?;
for block_ref in main_block.block_refs() {
let corevm_fs::BlockRef { service_id, hash } = block_ref;
let params = PreimageParams { service_id: *service_id, hash: hash.0, length: None };
partial_data.push(params);
}
main_block
};
let mut file = corevm_fs::NodeReader::from_main_block(main_block, &mut reader);
let exec_env = ExecEnv::decode(&mut file)?;
partial_data.extend(
PreimageIter::new(exec_env.program, &mut reader).filter_map(|entry| entry.ok()),
);
if exec_env.root_dir.hash.0 != [0; 32] {
partial_data.extend(
PreimageIter::new(exec_env.root_dir, &mut reader).filter_map(|entry| entry.ok()),
);
}
anyhow::Result::<_>::Ok(partial_data)
})
.await??;
for PreimageParams { service_id, hash, mut length } in partial_data.into_iter() {
if length.is_none() {
length = node.service_preimage_len(head, service_id, hash).await?;
}
if let Some(length) = length {
preimages.push((service_id, hash, length));
}
}
Ok(())
}
async fn wait_until_storage_is_cleared(node: &dyn Node, target: ServiceId) -> anyhow::Result<()> {
let mut sub = node.subscribe_service_value(target, &StorageKey::Gas.encode(), false).await?;
while let Some(value) = sub.next().await {
if value?.value.is_none() {
return Ok(())
}
}
Err(anyhow!("Service value subscription ended unexpectedly"))
}