use super::{package, queue, service, PackageOptions, Provision};
use anyhow::anyhow;
use bytes::Bytes;
use codec::{Compact, Decode, Encode};
use futures::{future::try_join_all, FutureExt, StreamExt};
use jam_bootstrap_service_common::{Instruction, Instructions};
use jam_program_blob_common::ConventionalMetadata as Metadata;
use jam_std_common::{hash_raw, BlockDesc, ChainSubUpdate, Node, Service};
use jam_tooling::{parsing::DataIdLen, NodeExt as _};
use jam_types::{
max_accumulate_gas, max_refine_gas, AnyHash, Balance, CoreIndex, Hash, HeaderHash, Memo,
RefineContext, ServiceId, Slot, UnsignedGas, WorkItem, WorkItems, WorkPackage, WorkPackageHash,
};
use std::sync::Arc;
const CRATE_NAME: &str = "jam-bootstrap-service";
pub struct BootstrapService {
node: Arc<dyn Node>,
service: Service,
id: ServiceId,
}
impl BootstrapService {
pub async fn query(
node: Arc<dyn Node>,
head: HeaderHash,
service_id: ServiceId,
) -> anyhow::Result<Self> {
let (service, _) = service::query(node.as_ref(), head, service_id, CRATE_NAME).await?;
Ok(Self { node, service, id: service_id })
}
pub fn id(&self) -> ServiceId {
self.id
}
pub fn create_work_item(&self, instructions: Vec<Instruction>) -> WorkItem {
WorkItem {
service: self.id,
code_hash: self.service.code_hash,
payload: Instructions(instructions).into(),
refine_gas_limit: max_refine_gas(),
accumulate_gas_limit: max_accumulate_gas(),
import_segments: Default::default(),
extrinsics: Default::default(),
export_count: 0,
}
}
pub fn create_work_package(&self, context: RefineContext, items: WorkItems) -> WorkPackage {
package::build_work_package(self.id, context, items)
}
#[allow(clippy::use_debug)]
pub async fn submit_work_items(
&self,
mut items: WorkItems,
core: CoreIndex,
) -> anyhow::Result<(WorkPackageHash, BlockDesc)> {
package::adjust_gas(&mut items)?;
let num_items = items.len();
let (context, anchor_slot) = self.node.create_refine_context().await?;
let package = self.create_work_package(context, items);
let anchor = BlockDesc { header_hash: package.context.anchor, slot: anchor_slot };
let package_hash = self.submit_work_package(package, core).await?;
eprintln!(
"Submitted {num_items} item(s) to {CRATE_NAME:?} in package {package_hash} with anchor at #{anchor_slot}"
);
Ok((package_hash, anchor))
}
pub async fn submit_work_item(
&self,
item: WorkItem,
core: CoreIndex,
) -> anyhow::Result<(WorkPackageHash, BlockDesc)> {
let items = vec![item].try_into().expect("Only one item");
self.submit_work_items(items, core).await
}
pub async fn submit_work_package(
&self,
work_package: WorkPackage,
core: CoreIndex,
) -> anyhow::Result<WorkPackageHash> {
let encoded = work_package.encode();
let package_hash = WorkPackageHash(hash_raw(&encoded));
self.node.submit_encoded_work_package(core, encoded.into(), &[]).await?;
Ok(package_hash)
}
#[allow(clippy::use_debug)]
pub async fn submit_instructions(
&self,
instructions: Vec<Instruction>,
core: CoreIndex,
) -> anyhow::Result<(WorkPackageHash, HeaderHash)> {
let num_instructions = instructions.len();
let work_item = self.create_work_item(instructions);
let (context, anchor_slot) = self.node.create_refine_context().await?;
let anchor = context.anchor;
let work_items = vec![work_item].try_into().expect("Only one item");
let work_package = self.create_work_package(context, work_items);
let package_hash = self.submit_work_package(work_package, core).await?;
eprintln!(
"Submitted {num_instructions} instruction(s) to {CRATE_NAME:?} \
in package {package_hash} with anchor at #{anchor_slot}",
);
Ok((package_hash, anchor))
}
pub async fn create_service(
&self,
code: DataIdLen,
amount: Balance,
memo: Memo,
min_item_gas: UnsignedGas,
min_memo_gas: UnsignedGas,
register: Option<String>,
raw: bool,
options: &PackageOptions,
) -> anyhow::Result<()> {
eprintln!("Waiting for network status...");
self.node.wait_for_sync().await?;
let best = self.node.best_block().await?;
let mut was_provided = self
.node
.service_preimage(best.header_hash, self.id, code.hash())
.await?
.is_some();
if let Some(d) = code.data() {
if let Ok((_, Metadata::Info(i))) = <(Compact<u32>, Metadata)>::decode(&mut &d[..]) {
eprintln!("Code identifies as {i}");
}
if options.provision == Provision::Bootstrap && !was_provided {
eprintln!("Providing service code to Bootstrap service...");
self.boot_ensure_provided(d, options).await?;
was_provided = true;
}
}
let code_hash = code.hash().into();
let inst = Instruction::CreateService {
code_hash,
code_len: code.len() as u64,
min_item_gas,
min_memo_gas,
endowment: amount,
memo,
registration: register.map(|r| r.into_bytes()),
};
let item = self.create_work_item(vec![inst]);
if options.queue {
let items = queue::push(item)?;
eprintln!("Item queued; queue now contains {items} items.");
eprintln!("NOTE: No code provision will be made.");
return Ok(())
}
let (package_hash, anchor) = self.submit_work_item(item, options.core()).await?;
let node = self.node.clone();
let results = try_join_all([
self.wait_for_the_service_to_be_created().boxed(),
package::wait_until_ready(node.as_ref(), anchor.header_hash, package_hash)
.map(|result| result.map(|_| 0_u32))
.boxed(),
])
.await?;
let id = results[0];
let best = self.node.best_block().await?;
if self.node.service_preimage(best.header_hash, id, code_hash.0).await?.is_some() {
was_provided = true;
} else if options.provision == Provision::Direct {
if let Some(code_data) = code.into_data() {
let at = self.provide(id, code_data).await?;
eprintln!("Service code provided at slot {at}");
was_provided = true;
}
}
if !was_provided {
eprintln!("Preimage of code hash {code_hash} must be provided!");
}
if raw {
println!("{id:08x}");
}
Ok(())
}
pub async fn boot_solicit(
&self,
items: impl IntoIterator<Item = (Hash, usize)>,
options: &PackageOptions,
) -> anyhow::Result<WorkPackageHash> {
let insts = items
.into_iter()
.map(|(hash, len)| Instruction::Solicit { hash: AnyHash::from(hash), len: len as u64 })
.collect::<Vec<_>>();
let core = options.core();
let (package_hash, _anchor) = self.submit_instructions(insts, core).await?;
Ok(package_hash)
}
pub async fn wait_for_the_service_to_be_created(&self) -> anyhow::Result<ServiceId> {
let mut create_sub = self.node.subscribe_service_value(self.id, b"created", false).await?;
let mut maybe_prior = None;
let id = loop {
let ChainSubUpdate { slot, value, .. } = create_sub
.next()
.await
.ok_or(anyhow!("Subscription to monitor creation terminated unexpectedly"))??;
if let Some(prior) = maybe_prior.as_ref() {
if prior != &value {
if let Some(ref b) = value {
let id = ServiceId::from_le_bytes([b[0], b[1], b[2], b[3]]);
eprintln!("Service {id:08x} created at slot {slot}");
break id
}
}
eprintln!("Creation not detected at slot {slot}.");
} else {
eprintln!("Work package submitted at slot {slot}. Monitoring...");
maybe_prior = Some(value)
}
};
Ok(id)
}
pub async fn provide(&self, id: ServiceId, data: Vec<u8>) -> anyhow::Result<Slot> {
let hash = hash_raw(&data);
let len = data.len() as u32;
let data = Bytes::from(data);
let mut sub = self.node.subscribe_service_request(id, hash, len, false).await?;
loop {
let ChainSubUpdate { value, .. } = sub.next().await.ok_or(anyhow!(
"Subscription to monitor service request terminated unexpectedly"
))??;
match value.map(|v| (v.len(), v)) {
None => break Err(anyhow!("Service request disappeared!")),
Some((0, _)) => {
self.node.submit_preimage(id, data.clone()).await?;
eprintln!("Preimage submitted. Waiting...");
},
Some((1, v)) => break Ok(v[0]),
Some((_, v)) => {
eprintln!(
"Preimage already provided at slot {} and then unrequested at slot {}!",
v[0], v[1]
);
break Err(anyhow!("Service unrequested the preimage!"))
},
}
}
}
async fn boot_ensure_provided(
&self,
data: &[u8],
options: &PackageOptions,
) -> anyhow::Result<Slot> {
let hash = hash_raw(data);
let len = data.len() as u32;
let mut sub = self.node.subscribe_service_request(self.id, hash, len, false).await?;
let slot = loop {
let ChainSubUpdate { value, .. } = sub
.next()
.await
.ok_or(anyhow!("Subscription to monitor creation terminated unexpectedly"))??;
match value.map(|v| (v.len(), v)) {
None => {
self.boot_solicit([(hash, len as usize)], options).await?;
},
Some((0, _)) => {
self.node.submit_preimage(self.id, data.to_vec().into()).await?;
},
Some((2, _)) => {
self.boot_solicit([(hash, len as usize)], options).await?;
},
Some((1, v)) | Some((3, v)) =>
break *v.last().expect("Already matched length 1 or 3; qed"),
_ => unreachable!(),
}
};
Ok(slot)
}
}