jamt 0.1.28

General CLI tool for interacting with JAM nodes
//! CoreVM service interface.

use super::{package, service, BootstrapService};
use anyhow::anyhow;
use codec::{Decode, Encode};
use corevm_host::{fs as corevm_fs, fs::ReadBlock as _, CoreVmInstruction, ExecEnv, StorageKey};
use corevm_tooling::PreimageReader;
use futures::{future::try_join_all, stream::FuturesUnordered, FutureExt, StreamExt};
use jam_bootstrap_service_common::Instruction;
use jam_std_common::{BlockDesc, Node, NodeExt as _, Service};
use jam_types::{CoreIndex, Hash, HeaderHash, Memo, ServiceId};
use std::sync::Arc;
use tokio::task::spawn_blocking;

mod preimage_iter;
use self::preimage_iter::*;

/// Destroy CoreVM service.
///
/// This command zombifies the service and forgets all associated preimages (program and
/// root file system).
pub async fn destroy(
	node: Arc<dyn Node>,
	target: ServiceId,
	bootstrap_service_id: ServiceId,
	core: CoreIndex,
) -> anyhow::Result<()> {
	let BlockDesc { header_hash: head, .. } = node.best_block().await?;
	let (corevm, corevm_code) = service::query(node.as_ref(), head, target, "corevm").await?;
	let bootstrap = BootstrapService::query(node.clone(), head, bootstrap_service_id).await?;
	let exec_env_ref = node
		.typed_service_value::<corevm_fs::BlockRef>(head, target, &StorageKey::ExecEnvRef)
		.await?;
	zombify(node.as_ref(), target, bootstrap_service_id, core, &corevm, &bootstrap).await?;
	forget_all_preimages(node, core, &corevm, &bootstrap, corevm_code.len() as u64, exec_env_ref)
		.await?;
	Ok(())
}

/// Query parameters of all preimages that the service uses.
///
/// Returns service id, preimage hash and preimage length for each preimage.
pub async fn preimages(
	node: Arc<dyn Node>,
	target: ServiceId,
) -> anyhow::Result<Vec<(ServiceId, Hash, u32)>> {
	let BlockDesc { header_hash: head, .. } = node.best_block().await?;
	let (_corevm, _corevm_code) = service::query(node.as_ref(), head, target, "corevm").await?;
	let mut preimages = Vec::new();
	// TODO @ivan We should forget CoreVM preimage here once CoreVM service itself provides it.
	if let Some(exec_env_ref) = node
		.typed_service_value::<corevm_fs::BlockRef>(head, target, &StorageKey::ExecEnvRef)
		.await?
	{
		fetch_all_preimage_info(node.clone(), head, exec_env_ref, &mut preimages).await?;
	}
	Ok(preimages)
}

/// Sends `Destroy` instruction and waits until the storage is cleared.
async fn zombify(
	node: &dyn Node,
	target: ServiceId,
	bootstrap_service_id: ServiceId,
	core: CoreIndex,
	corevm: &Service,
	bootstrap: &BootstrapService,
) -> anyhow::Result<()> {
	let (package_hash, anchor) = bootstrap
		.submit_instructions(
			vec![Instruction::Transfer {
				destination: target,
				amount: 0,
				gas_limit: corevm.min_memo_gas,
				memo: {
					let mut memo = Memo::zero();
					let ejector = bootstrap_service_id;
					CoreVmInstruction::Destroy { ejector }.encode_to(&mut &mut memo[..]);
					memo
				},
			}],
			core,
		)
		.await?;
	try_join_all([
		wait_until_storage_is_cleared(node, target).boxed(),
		package::wait_until_ready(node, anchor, package_hash)
			.map(|result| result.map(|_| ()))
			.boxed(),
	])
	.await?;
	Ok(())
}

async fn forget_all_preimages(
	node: Arc<dyn Node>,
	core: CoreIndex,
	corevm: &Service,
	bootstrap: &BootstrapService,
	corevm_code_len: u64,
	exec_env_ref: Option<corevm_fs::BlockRef>,
) -> anyhow::Result<()> {
	let BlockDesc { header_hash: head, .. } = node.best_block().await?;
	let mut instructions = Vec::new();
	instructions
		.push(Instruction::Forget { hash: corevm.code_hash.0.into(), len: corevm_code_len });
	if let Some(exec_env_ref) = exec_env_ref {
		let mut preimages = Vec::new();
		fetch_all_preimage_info(node.clone(), head, exec_env_ref, &mut preimages).await?;
		instructions.extend(preimages.into_iter().filter_map(|(service_id, hash, len)| {
			(service_id == bootstrap.id())
				.then_some(Instruction::Forget { hash: hash.into(), len: len.into() })
		}));
	};
	let packages = FuturesUnordered::new();
	let mut instructions = instructions.into_iter();
	loop {
		// TODO @ivan 50 instruction per package is very conservative.
		let chunk: Vec<_> = instructions.by_ref().take(50).collect();
		if chunk.is_empty() {
			break;
		}
		let (package_hash, anchor) = bootstrap.submit_instructions(chunk, core).await?;
		packages.push(
			package::wait_until_ready(node.as_ref(), anchor, package_hash)
				.map(move |result| (result, package_hash)),
		);
	}
	package::wait_until_all_ready(packages).await
}

async fn fetch_all_preimage_info(
	node: Arc<dyn Node>,
	head: HeaderHash,
	exec_env_ref: corevm_fs::BlockRef,
	preimages: &mut Vec<(ServiceId, Hash, u32)>,
) -> anyhow::Result<()> {
	let mut reader = PreimageReader::new(node.clone(), head);
	let partial_data: Vec<PreimageParams> = spawn_blocking(move || {
		let mut partial_data = Vec::new();
		// Push ExecEnv preimages.
		let main_block = {
			let block = reader.read_block(&exec_env_ref)?;
			{
				let corevm_fs::BlockRef { service_id, hash } = exec_env_ref;
				let params =
					PreimageParams { service_id, hash: hash.0, length: Some(block.len() as u32) };
				partial_data.push(params);
			}
			let main_block = corevm_fs::MainBlock::decode(block)?;
			for block_ref in main_block.block_refs() {
				let corevm_fs::BlockRef { service_id, hash } = block_ref;
				let params = PreimageParams { service_id: *service_id, hash: hash.0, length: None };
				partial_data.push(params);
			}
			main_block
		};
		let mut file = corevm_fs::NodeReader::from_main_block(main_block, &mut reader);
		let exec_env = ExecEnv::decode(&mut file)?;
		// Push program preimages.
		partial_data.extend(
			PreimageIter::new(exec_env.program, &mut reader).filter_map(|entry| entry.ok()),
		);
		// Push root directory preimages.
		if exec_env.root_dir.hash.0 != [0; 32] {
			partial_data.extend(
				PreimageIter::new(exec_env.root_dir, &mut reader).filter_map(|entry| entry.ok()),
			);
		}
		anyhow::Result::<_>::Ok(partial_data)
	})
	.await??;
	for PreimageParams { service_id, hash, mut length } in partial_data.into_iter() {
		if length.is_none() {
			length = node.service_preimage_len(head, service_id, hash).await?;
		}
		if let Some(length) = length {
			preimages.push((service_id, hash, length));
		}
	}
	Ok(())
}

async fn wait_until_storage_is_cleared(node: &dyn Node, target: ServiceId) -> anyhow::Result<()> {
	// It's enough to check Gas because it's deleted only if the VM is destroyed.
	let mut sub = node.subscribe_service_value(target, &StorageKey::Gas.encode(), false).await?;
	while let Some(value) = sub.next().await {
		if value?.value.is_none() {
			return Ok(())
		}
	}
	Err(anyhow!("Service value subscription ended unexpectedly"))
}