jamt 0.1.28

General CLI tool for interacting with JAM nodes
//! Utilities for interacting with work packages.

use super::{formatting::print_service_info, queue, BootstrapService, PackageOptions};
use anyhow::anyhow;
use codec::Encode;
use corevm_tooling::ArcNodeExt;
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use jam_bootstrap_service_common::Instruction;
use jam_std_common::{hash_raw, Node, Service, WorkPackageStatus};
use jam_tooling::{parsing::Blob, NodeExt as _};
use jam_types::{
	core_count, max_accumulate_gas, max_refine_gas, max_work_items, AuthConfig, Authorization,
	Authorizer, Balance, BoundedVec, CoreIndex, HeaderHash, ImportSpec, Memo, RefineContext,
	ServiceId, UnsignedGas, WorkItem, WorkItems, WorkPackage, WorkPackageHash, WorkReportHash,
};
use std::{future::Future, sync::Arc};

/// Builds a work package with the null authorizer and the given bootstrap service as auth code
/// host.
pub fn build_work_package(
	auth_code_host: ServiceId,
	context: RefineContext,
	items: WorkItems,
) -> WorkPackage {
	WorkPackage {
		authorization: Authorization::new(),
		auth_code_host,
		authorizer: Authorizer {
			code_hash: jam_null_authorizer_bin::HASH.into(),
			config: AuthConfig::new(),
		},
		context,
		items,
	}
}

/// Waits until the package is refined.
///
/// `anchor` is the work package anchor.
///
/// Returns work report hash.
pub async fn wait_until_ready(
	node: &dyn Node,
	anchor: HeaderHash,
	package_hash: WorkPackageHash,
) -> anyhow::Result<WorkReportHash> {
	let mut sub = node.subscribe_work_package_status(package_hash, anchor, false).await?;
	loop {
		match sub
			.next()
			.await
			.ok_or(anyhow!("Package {package_hash} status subscription unexpectedly ended"))??
			.value
		{
			WorkPackageStatus::Reportable { .. } | WorkPackageStatus::Reported { .. } => {},
			WorkPackageStatus::Ready { report_hash, .. } => break Ok(report_hash),
			WorkPackageStatus::Failed(message) =>
				break Err(anyhow!("Package {package_hash} failed: {message}")),
		}
	}
}

/// Waits until all [`wait_until_ready`]-based tasks are finished.
pub async fn wait_until_all_ready(
	mut tasks: FuturesUnordered<
		impl Future<Output = (anyhow::Result<WorkReportHash>, WorkPackageHash)>,
	>,
) -> anyhow::Result<()> {
	let mut some_failed = false;
	while let Some((result, package_hash)) = tasks.next().await {
		match result {
			Ok(report_hash) =>
				eprintln!("Package {package_hash} finished; report hash is {report_hash}"),
			Err(e) => {
				some_failed = true;
				eprintln!("{e}");
			},
		}
	}
	if some_failed {
		return Err(anyhow!("Some packages failed"));
	}
	Ok(())
}

/// Adjusts refine and accumulate gas limits of every item that sets those limits to maximum.
///
/// Items with gas below maximum are left untouched. The remaining gas is spread evenly across items
/// with maximum gas limit.
///
/// Fails if the total gas of the fixed-gas items is above maximum.
pub fn adjust_gas(items: &mut WorkItems) -> anyhow::Result<()> {
	if items.len() <= 1 {
		// Nothing to adjust.
		return Ok(());
	}
	adjust_refine_gas(items)?;
	adjust_accumulate_gas(items)?;
	Ok(())
}

fn adjust_refine_gas(items: &mut WorkItems) -> anyhow::Result<()> {
	let max_gas = max_refine_gas();
	let (num_adjustable_items, total_fixed_gas) =
		gas_stats(items.iter().map(|item| item.refine_gas_limit), max_gas)?;
	distribute_gas(
		items.iter_mut().map(|item| &mut item.refine_gas_limit),
		max_gas,
		num_adjustable_items,
		total_fixed_gas,
	);
	Ok(())
}

fn adjust_accumulate_gas(items: &mut WorkItems) -> anyhow::Result<()> {
	let max_gas = max_accumulate_gas();
	let (num_adjustable_items, total_fixed_gas) =
		gas_stats(items.iter().map(|item| item.accumulate_gas_limit), max_gas)?;
	distribute_gas(
		items.iter_mut().map(|item| &mut item.accumulate_gas_limit),
		max_gas,
		num_adjustable_items,
		total_fixed_gas,
	);
	Ok(())
}

fn gas_stats(
	items: impl IntoIterator<Item = UnsignedGas>,
	max_gas: UnsignedGas,
) -> anyhow::Result<(usize, UnsignedGas)> {
	let mut num_adjustable_items = 0;
	let mut total_fixed_gas = 0_u128;
	for gas in items.into_iter() {
		if gas == max_gas {
			num_adjustable_items += 1;
		} else {
			total_fixed_gas += u128::from(gas);
		}
	}
	if total_fixed_gas > u128::from(max_gas) {
		return Err(anyhow!("Too much gas requested"));
	}
	Ok((num_adjustable_items, total_fixed_gas as UnsignedGas))
}

fn distribute_gas<'a>(
	items: impl ExactSizeIterator<Item = &'a mut UnsignedGas>,
	max_gas: UnsignedGas,
	num_adjustable_items: usize,
	total_fixed_gas: UnsignedGas,
) {
	if num_adjustable_items == 0 {
		// Nothing to adjust.
		return;
	}
	// Distribute the remaining gas between adjustable items.
	let mut remaining_gas = max_gas - total_fixed_gas as UnsignedGas;
	let gas_per_adjustable_item = remaining_gas.div_ceil(num_adjustable_items as u64);
	for gas in items {
		if *gas != max_gas {
			continue;
		}
		*gas = gas_per_adjustable_item.min(remaining_gas);
		remaining_gas = remaining_gas.saturating_sub(gas_per_adjustable_item);
	}
}

/// Builds and submits work packages.
pub struct PackageBuilder<N: Node + ArcNodeExt + Send + Sync + 'static> {
	node: Arc<N>,
	service_id: ServiceId,
	options: PackageOptions,
}

impl<N: Node + ArcNodeExt + Send + Sync + 'static> PackageBuilder<N> {
	pub fn new(node: Arc<N>, service_id: ServiceId, options: PackageOptions) -> Self {
		Self { node, service_id, options }
	}

	pub async fn transfer(
		&self,
		boot: &BootstrapService,
		service_id: ServiceId,
		amount: Balance,
		memo: Memo,
		gas: Option<UnsignedGas>,
	) -> anyhow::Result<()> {
		let service = self.query_service(service_id).await?;
		let inst = Instruction::Transfer {
			destination: service_id,
			amount,
			gas_limit: gas.unwrap_or(service.min_memo_gas),
			memo,
		};
		let item = boot.create_work_item(vec![inst]);
		if self.options.queue {
			let items = queue::push(item)?;
			eprintln!("Item queued; queue now contains {items} items.");
			return Ok(())
		}
		let (package_hash, anchor) = boot.submit_work_item(item, self.options.core()).await?;
		wait_until_ready(self.node.as_ref(), anchor.header_hash, package_hash).await?;
		Ok(())
	}

	pub async fn item(
		&self,
		service_id: ServiceId,
		payload: Blob,
		refine_gas: Option<UnsignedGas>,
		accumulate_gas: Option<UnsignedGas>,
		exports: u16,
		import: Vec<ImportSpec>,
	) -> anyhow::Result<()> {
		let service = self.query_service(service_id).await?;
		let import_segments = import.try_into().map_err(|_| {
			anyhow!("Too many imports for workitem, expect at most {}.", jam_types::max_imports(),)
		})?;
		let item = WorkItem {
			service: service_id,
			code_hash: service.code_hash,
			payload: payload.into(),
			refine_gas_limit: refine_gas.unwrap_or(max_refine_gas()),
			accumulate_gas_limit: accumulate_gas.unwrap_or(max_accumulate_gas()),
			import_segments,
			extrinsics: BoundedVec::new(),
			export_count: exports,
		};

		if self.options.queue {
			let items = queue::push(item)?;
			eprintln!("Item queued; queue now contains {items} items.");
			return Ok(())
		}

		let (context, anchor_slot) = self.node.create_refine_context().await?;
		let package_hash = self
			.submit_package(
				context,
				vec![item].try_into().expect("Only one item"),
				self.options.core(),
			)
			.await?;
		eprintln!("Item submitted in package {package_hash} with anchor at #{anchor_slot}");

		// TODO: option to wait until reported and print WR hash/seg-root.
		Ok(())
	}

	pub async fn pack_list(
		&self,
		service_id: ServiceId,
		payloads: Vec<Blob>,
		refine_gas: Option<UnsignedGas>,
		accumulate_gas: Option<UnsignedGas>,
	) -> anyhow::Result<()> {
		let service = self.query_service(service_id).await?;
		let payloads = if payloads.is_empty() { vec![Blob::default()] } else { payloads };
		let start_core = self.options.core();
		let items: Vec<WorkItem> = payloads
			.into_iter()
			.map(|payload| WorkItem {
				service: service_id,
				code_hash: service.code_hash,
				payload: payload.into(),
				refine_gas_limit: refine_gas.unwrap_or(max_refine_gas()),
				accumulate_gas_limit: accumulate_gas.unwrap_or(max_accumulate_gas()),
				import_segments: BoundedVec::new(),
				extrinsics: BoundedVec::new(),
				export_count: 0,
			})
			.collect();

		let (context, anchor_slot) = self.node.create_refine_context().await?;
		let mut submitted = 0;
		for (i, item) in items.into_iter().enumerate() {
			let core = (start_core + i as CoreIndex) % core_count() as CoreIndex;
			if let Err(e) = self
				.submit_package(
					context.clone(),
					vec![item].try_into().expect("Only one item"),
					core,
				)
				.await
			{
				eprintln!("Error submitting package for core {core}: {e}");
				continue;
			};
			submitted += 1;
		}
		eprintln!("Submitted {submitted} work package(s) with anchor at #{anchor_slot}");
		Ok(())
	}

	pub async fn pack(&self, boot: &BootstrapService) -> anyhow::Result<()> {
		let items = queue::take()?;
		if items.is_empty() {
			eprintln!("No items in queue.");
			return Ok(())
		}
		let count = items.len();
		let core = self.options.core();
		let packages = FuturesUnordered::new();
		for some_items in items.chunks(max_work_items()) {
			let some_items = some_items
				.to_vec()
				.try_into()
				.expect("Chunk size is bounded by max_work_items()");
			let (package_hash, anchor) = boot.submit_work_items(some_items, core).await?;
			eprintln!(
				"{count} item(s) submitted in package {package_hash} with anchor at #{}",
				anchor.slot
			);
			packages.push(
				wait_until_ready(self.node.as_ref(), anchor.header_hash, package_hash)
					.map(move |result| (result, package_hash)),
			);
		}
		wait_until_all_ready(packages).await
	}

	async fn query_service(&self, service_id: ServiceId) -> anyhow::Result<Service> {
		let best = self.node.best_block().await?;
		let (service, outer, maybe_inner) =
			self.node.query_service_and_corevm_guest(best.header_hash, service_id).await?;
		print_service_info(outer.as_ref(), maybe_inner.as_ref());
		Ok(service)
	}

	async fn submit_package(
		&self,
		context: RefineContext,
		items: WorkItems,
		core: CoreIndex,
	) -> anyhow::Result<WorkPackageHash> {
		let package = build_work_package(self.service_id, context, items);
		let encoded = package.encode();
		let package_hash = WorkPackageHash(hash_raw(&encoded));
		self.node.submit_encoded_work_package(core, encoded.into(), &[]).await?;
		Ok(package_hash)
	}
}

#[cfg(test)]
mod tests {
	use super::*;

	#[test]
	fn distribute_gas_works() {
		let mut items = [100, 100, 100];
		distribute_gas(items.iter_mut(), 100, 3, 0);
		assert_eq!([34, 34, 32], items);
		let mut items = [50, 100, 100];
		distribute_gas(items.iter_mut(), 100, 2, 50);
		assert_eq!([50, 25, 25], items);
	}
}