jamt 0.1.28

General CLI tool for interacting with JAM nodes
use super::{package, queue, service, PackageOptions, Provision};
use anyhow::anyhow;
use bytes::Bytes;
use codec::{Compact, Decode, Encode};
use futures::{future::try_join_all, FutureExt, StreamExt};
use jam_bootstrap_service_common::{Instruction, Instructions};
use jam_program_blob_common::ConventionalMetadata as Metadata;
use jam_std_common::{hash_raw, BlockDesc, ChainSubUpdate, Node, Service};
use jam_tooling::{parsing::DataIdLen, NodeExt as _};
use jam_types::{
	max_accumulate_gas, max_refine_gas, AnyHash, Balance, CoreIndex, Hash, HeaderHash, Memo,
	RefineContext, ServiceId, Slot, UnsignedGas, WorkItem, WorkItems, WorkPackage, WorkPackageHash,
};
use std::sync::Arc;

const CRATE_NAME: &str = "jam-bootstrap-service";

/// JAM bootstrap service interface.
///
/// Contains methods that simplify interaction with the service.
pub struct BootstrapService {
	node: Arc<dyn Node>,
	service: Service,
	id: ServiceId,
}

impl BootstrapService {
	/// Creates new instance of bootstrap service interface.
	///
	/// Connects to the provided JAM node and checks if the specified service has matching
	/// metadata; returns an error if not.
	pub async fn query(
		node: Arc<dyn Node>,
		head: HeaderHash,
		service_id: ServiceId,
	) -> anyhow::Result<Self> {
		let (service, _) = service::query(node.as_ref(), head, service_id, CRATE_NAME).await?;
		Ok(Self { node, service, id: service_id })
	}

	pub fn id(&self) -> ServiceId {
		self.id
	}

	/// Creates bootstrap-specific work item from the provided instructions.
	pub fn create_work_item(&self, instructions: Vec<Instruction>) -> WorkItem {
		WorkItem {
			service: self.id,
			code_hash: self.service.code_hash,
			// TODO: @ivan Submit instructions via extrinsics.
			payload: Instructions(instructions).into(),
			refine_gas_limit: max_refine_gas(),
			accumulate_gas_limit: max_accumulate_gas(),
			import_segments: Default::default(),
			extrinsics: Default::default(),
			export_count: 0,
		}
	}

	/// Creates bootstrap-specific work package from the provided refine context and work items.
	pub fn create_work_package(&self, context: RefineContext, items: WorkItems) -> WorkPackage {
		// TODO: @ivan Select a working authorizer from our (persistent) list and use it.
		package::build_work_package(self.id, context, items)
	}

	/// Submits bootstrap-specific work items to the specified core.
	///
	/// Returns work package hash and anchor.
	#[allow(clippy::use_debug)]
	pub async fn submit_work_items(
		&self,
		mut items: WorkItems,
		core: CoreIndex,
	) -> anyhow::Result<(WorkPackageHash, BlockDesc)> {
		package::adjust_gas(&mut items)?;
		let num_items = items.len();
		let (context, anchor_slot) = self.node.create_refine_context().await?;
		let package = self.create_work_package(context, items);
		let anchor = BlockDesc { header_hash: package.context.anchor, slot: anchor_slot };
		let package_hash = self.submit_work_package(package, core).await?;
		eprintln!(
			"Submitted {num_items} item(s) to {CRATE_NAME:?} in package {package_hash} with anchor at #{anchor_slot}"
		);
		Ok((package_hash, anchor))
	}

	/// Submits bootstrap-specific work item to the specified core.
	///
	/// Returns work package hash and anchor.
	pub async fn submit_work_item(
		&self,
		item: WorkItem,
		core: CoreIndex,
	) -> anyhow::Result<(WorkPackageHash, BlockDesc)> {
		let items = vec![item].try_into().expect("Only one item");
		self.submit_work_items(items, core).await
	}

	/// Submits bootstrap-specific work package to the specified core.
	pub async fn submit_work_package(
		&self,
		work_package: WorkPackage,
		core: CoreIndex,
	) -> anyhow::Result<WorkPackageHash> {
		let encoded = work_package.encode();
		let package_hash = WorkPackageHash(hash_raw(&encoded));
		self.node.submit_encoded_work_package(core, encoded.into(), &[]).await?;
		Ok(package_hash)
	}

	/// Submits instructions to the bootstrap service using the specified core.
	///
	/// Returns work package hash and anchor.
	#[allow(clippy::use_debug)]
	pub async fn submit_instructions(
		&self,
		instructions: Vec<Instruction>,
		// TODO: @ivan Select best core based on how many authorizers we know for the current auth
		// pool.
		core: CoreIndex,
	) -> anyhow::Result<(WorkPackageHash, HeaderHash)> {
		let num_instructions = instructions.len();
		let work_item = self.create_work_item(instructions);
		let (context, anchor_slot) = self.node.create_refine_context().await?;
		let anchor = context.anchor;
		let work_items = vec![work_item].try_into().expect("Only one item");
		let work_package = self.create_work_package(context, work_items);
		let package_hash = self.submit_work_package(work_package, core).await?;
		eprintln!(
			"Submitted {num_instructions} instruction(s) to {CRATE_NAME:?} \
            in package {package_hash} with anchor at #{anchor_slot}",
		);
		Ok((package_hash, anchor))
	}

	pub async fn create_service(
		&self,
		code: DataIdLen,
		amount: Balance,
		memo: Memo,
		min_item_gas: UnsignedGas,
		min_memo_gas: UnsignedGas,
		register: Option<String>,
		raw: bool,
		options: &PackageOptions,
	) -> anyhow::Result<()> {
		eprintln!("Waiting for network status...");
		self.node.wait_for_sync().await?;
		let best = self.node.best_block().await?;

		let mut was_provided = self
			.node
			.service_preimage(best.header_hash, self.id, code.hash())
			.await?
			.is_some();

		if let Some(d) = code.data() {
			if let Ok((_, Metadata::Info(i))) = <(Compact<u32>, Metadata)>::decode(&mut &d[..]) {
				eprintln!("Code identifies as {i}");
			}
			if options.provision == Provision::Bootstrap && !was_provided {
				eprintln!("Providing service code to Bootstrap service...");
				self.boot_ensure_provided(d, options).await?;
				was_provided = true;
			}
		}

		let code_hash = code.hash().into();
		let inst = Instruction::CreateService {
			code_hash,
			code_len: code.len() as u64,
			min_item_gas,
			min_memo_gas,
			endowment: amount,
			memo,
			registration: register.map(|r| r.into_bytes()),
		};
		let item = self.create_work_item(vec![inst]);
		if options.queue {
			let items = queue::push(item)?;
			eprintln!("Item queued; queue now contains {items} items.");
			eprintln!("NOTE: No code provision will be made.");
			return Ok(())
		}
		let (package_hash, anchor) = self.submit_work_item(item, options.core()).await?;
		let node = self.node.clone();

		let results = try_join_all([
			self.wait_for_the_service_to_be_created().boxed(),
			package::wait_until_ready(node.as_ref(), anchor.header_hash, package_hash)
				.map(|result| result.map(|_| 0_u32))
				.boxed(),
		])
		.await?;
		let id = results[0];
		let best = self.node.best_block().await?;

		if self.node.service_preimage(best.header_hash, id, code_hash.0).await?.is_some() {
			was_provided = true;
		} else if options.provision == Provision::Direct {
			if let Some(code_data) = code.into_data() {
				let at = self.provide(id, code_data).await?;
				eprintln!("Service code provided at slot {at}");
				was_provided = true;
			}
		}
		if !was_provided {
			eprintln!("Preimage of code hash {code_hash} must be provided!");
		}
		if raw {
			println!("{id:08x}");
		}
		Ok(())
	}

	pub async fn boot_solicit(
		&self,
		items: impl IntoIterator<Item = (Hash, usize)>,
		options: &PackageOptions,
	) -> anyhow::Result<WorkPackageHash> {
		let insts = items
			.into_iter()
			.map(|(hash, len)| Instruction::Solicit { hash: AnyHash::from(hash), len: len as u64 })
			.collect::<Vec<_>>();
		let core = options.core();
		let (package_hash, _anchor) = self.submit_instructions(insts, core).await?;
		Ok(package_hash)
	}

	// Wait for the service to be created.
	// TODO: This needs to be done in a much more robust way. Right now it'll break if
	//   anyone else happens to be trying to create a service at the same time.
	pub async fn wait_for_the_service_to_be_created(&self) -> anyhow::Result<ServiceId> {
		let mut create_sub = self.node.subscribe_service_value(self.id, b"created", false).await?;
		let mut maybe_prior = None;
		let id = loop {
			let ChainSubUpdate { slot, value, .. } = create_sub
				.next()
				.await
				.ok_or(anyhow!("Subscription to monitor creation terminated unexpectedly"))??;
			if let Some(prior) = maybe_prior.as_ref() {
				if prior != &value {
					if let Some(ref b) = value {
						let id = ServiceId::from_le_bytes([b[0], b[1], b[2], b[3]]);
						eprintln!("Service {id:08x} created at slot {slot}");
						break id
					}
				}
				eprintln!("Creation not detected at slot {slot}.");
			} else {
				eprintln!("Work package submitted at slot {slot}. Monitoring...");
				maybe_prior = Some(value)
			}
		};
		// TODO: We may want to use id to initialize it via a work-item.
		Ok(id)
	}

	pub async fn provide(&self, id: ServiceId, data: Vec<u8>) -> anyhow::Result<Slot> {
		let hash = hash_raw(&data);
		let len = data.len() as u32;
		let data = Bytes::from(data);
		let mut sub = self.node.subscribe_service_request(id, hash, len, false).await?;
		loop {
			let ChainSubUpdate { value, .. } = sub.next().await.ok_or(anyhow!(
				"Subscription to monitor service request terminated unexpectedly"
			))??;
			match value.map(|v| (v.len(), v)) {
				None => break Err(anyhow!("Service request disappeared!")),
				Some((0, _)) => {
					self.node.submit_preimage(id, data.clone()).await?;
					eprintln!("Preimage submitted. Waiting...");
				},
				Some((1, v)) => break Ok(v[0]),
				Some((_, v)) => {
					eprintln!(
						"Preimage already provided at slot {} and then unrequested at slot {}!",
						v[0], v[1]
					);
					break Err(anyhow!("Service unrequested the preimage!"))
				},
			}
		}
	}

	async fn boot_ensure_provided(
		&self,
		data: &[u8],
		options: &PackageOptions,
	) -> anyhow::Result<Slot> {
		let hash = hash_raw(data);
		let len = data.len() as u32;
		let mut sub = self.node.subscribe_service_request(self.id, hash, len, false).await?;
		let slot = loop {
			let ChainSubUpdate { value, .. } = sub
				.next()
				.await
				.ok_or(anyhow!("Subscription to monitor creation terminated unexpectedly"))??;
			match value.map(|v| (v.len(), v)) {
				None => {
					self.boot_solicit([(hash, len as usize)], options).await?;
				},
				Some((0, _)) => {
					self.node.submit_preimage(self.id, data.to_vec().into()).await?;
				},
				Some((2, _)) => {
					self.boot_solicit([(hash, len as usize)], options).await?;
				},
				Some((1, v)) | Some((3, v)) =>
					break *v.last().expect("Already matched length 1 or 3; qed"),
				_ => unreachable!(),
			}
		};
		Ok(slot)
	}
}