use anyhow::anyhow;
use clap::{Parser, ValueEnum};
use codec::{Codec, Compact, Decode, Encode};
use corevm_host::CoreVmInstruction;
use jam_bootstrap_service_common::{Instruction, Instructions};
use jam_program_blob::{ConventionalMetadata as Metadata, CrateInfo};
use jam_std_common::{
hash_raw, max_accumulate_gas, max_refine_gas, BlockDesc, RpcClient as _, Service, StateUpdate,
VersionedParameters,
};
use jam_tooling::{
parsing::{self, Blob, DataId, DataIdLen},
query_service, CodeInfo, CommonArgs,
};
use jam_types::{
AnyHash, AnyVec, AuthConfig, Authorization, Authorizer, Balance, BoundedVec, CoreIndex, Hash,
HeaderHash, Memo, RefineContext, ServiceId, Slot, ToAny, UnsignedGas, VecSet, WorkItem,
WorkPackage, WorkPackageHash,
};
use jsonrpsee::ws_client::WsClient;
use std::{cell::RefCell, collections::btree_map::BTreeMap};
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, clap::ValueEnum)]
pub enum BlockId {
Best,
Final,
}
#[derive(clap::Parser, Debug, Clone)]
pub enum Inspections {
Slot,
Service {
#[arg(value_parser = parsing::service_id)]
id: ServiceId,
},
Storage {
#[arg(value_parser = parsing::service_id)]
id: ServiceId,
#[arg(value_parser = parsing::blob)]
key: Blob,
},
Preimage {
#[arg(value_parser = parsing::service_id)]
id: ServiceId,
#[arg(value_parser = parsing::data_id)]
key: DataId,
},
Request {
#[arg(value_parser = parsing::service_id)]
id: ServiceId,
#[arg(value_parser = parsing::data_id_len)]
key: DataIdLen,
},
}
#[derive(clap::Subcommand, Debug, Clone)]
pub enum VmCommands {
New {
#[arg(value_parser = parsing::blob)]
code: Blob,
#[arg(default_value_t = 0)]
amount: Balance,
#[arg(short = 'G', long, default_value_t = 1_000_000)]
min_item_gas: UnsignedGas,
#[arg(short = 'g', long, default_value_t = 1_000_000)]
min_memo_gas: UnsignedGas,
},
Upgrade {
#[arg(value_parser = parsing::service_id)]
service_id: ServiceId,
#[arg(value_parser = parsing::blob)]
code: Blob,
},
}
#[derive(clap::Subcommand, Debug, Clone)]
pub enum Commands {
CreateService {
#[arg(value_parser = parsing::data_id_len)]
code: DataIdLen,
#[arg(default_value_t = 0)]
amount: Balance,
#[arg(value_parser = parsing::memo, default_value = "")]
memo: Memo,
#[arg(short = 'G', long, default_value_t = 1_000_000)]
min_item_gas: UnsignedGas,
#[arg(short = 'g', long, default_value_t = 1_000_000)]
min_memo_gas: UnsignedGas,
#[arg(short = 'r', long)]
register: Option<String>,
},
Item {
#[arg(value_parser = parsing::service_id)]
service_id: ServiceId,
#[arg(value_parser = parsing::blob, default_value = "")]
payload: Blob,
#[arg(short = 'G', long)]
refine_gas: Option<UnsignedGas>,
#[arg(short = 'g', long)]
accumulate_gas: Option<UnsignedGas>,
#[arg(short = 'e', long, default_value_t = 0)]
exports: u16,
},
Transfer {
#[arg(value_parser = parsing::service_id)]
service_id: ServiceId,
#[arg(default_value_t = 0)]
amount: Balance,
#[arg(value_parser = parsing::memo, default_value = "")]
memo: Memo,
#[arg(short = 'g', long, value_parser = parsing::gas)]
gas: Option<UnsignedGas>,
},
Inspect {
#[arg(default_value = "best")]
id: BlockId,
#[command(subcommand)]
field: Inspections,
},
Provide {
#[arg(value_parser = parsing::service_id)]
requester: ServiceId,
#[arg(value_parser = parsing::blob)]
preimage: Blob,
},
Queue,
Pack,
Vm {
#[command(subcommand)]
command: VmCommands,
},
}
#[derive(ValueEnum, Debug, Default, Clone, Copy, Eq, PartialEq)]
pub enum Provision {
None,
Direct,
#[default]
Bootstrap,
}
#[derive(Parser, Debug)]
#[command(name = "jamt", version, long_version = format!("{}\nGP {}", env!("CARGO_PKG_VERSION"), jam_types::GP_VERSION))]
pub struct BootArgs {
#[command(flatten)]
common: CommonArgs,
#[arg(short = 'b', long, value_parser = parsing::service_id, default_value_t = 0)]
bootstrap_service_id: ServiceId,
#[arg(short = 'p', long, value_enum, default_value_t = Provision::Bootstrap)]
provision: Provision,
#[arg(short = 'c', long)]
force_core: Option<CoreIndex>,
#[arg(short = 'q', long, default_value_t = false)]
queue: bool,
#[command(subcommand)]
command: Commands,
}
#[derive(Encode, Decode, Debug, Default)]
struct QueueState {
items: Vec<WorkItem>,
}
pub struct BootstrapProvision<'a> {
context: &'a Context,
preimages: Vec<(Hash, Vec<u8>)>,
}
impl BootstrapProvision<'_> {
pub async fn ensure(&mut self, data: &[u8]) -> anyhow::Result<(bool, Hash)> {
let hash = hash_raw(data);
Ok((
match self
.context
.rpc
.service_request(
self.context.last_head(),
self.context.args.bootstrap_service_id,
hash,
data.len() as u32,
)
.await?
.map(|v| v.len())
{
Some(1) => false,
_ => {
self.preimages.push((hash, data.to_vec()));
true
},
},
hash,
))
}
pub async fn provide(self) -> anyhow::Result<BTreeMap<Hash, Slot>> {
self.context
.boot_solicit(self.preimages.iter().map(|(h, d)| (*h, d.len())))
.await?;
let mut items = Vec::new();
for (hash, data) in self.preimages.into_iter() {
let len = data.len() as u32;
let mut sub = self
.context
.rpc
.subscribe_service_request(self.context.args.bootstrap_service_id, hash, len, false)
.await?;
loop {
let StateUpdate { value, .. } = sub
.next()
.await
.ok_or(anyhow!("Subscription to monitor creation terminated unexpectedly"))??;
match value.map(|v| v.len()) {
Some(0) => {
items.push((hash, data.len() as u32));
self.context
.rpc
.submit_preimage(self.context.args.bootstrap_service_id, data.clone())
.await?;
break;
},
Some(1) | Some(3) => {
break
},
_ => {
},
}
}
sub.unsubscribe().await?;
}
let mut map = BTreeMap::new();
for (hash, len) in items.into_iter() {
let mut sub = self
.context
.rpc
.subscribe_service_request(self.context.args.bootstrap_service_id, hash, len, false)
.await?;
let slot = loop {
let StateUpdate { value, .. } = sub
.next()
.await
.ok_or(anyhow!("Subscription to monitor creation terminated unexpectedly"))??;
match value.map(|v| (v.len(), v)) {
Some((1, v)) | Some((3, v)) => {
break *v.last().expect("Already matched length 1; qed");
},
_ => {
},
}
};
sub.unsubscribe().await?;
map.insert(hash, slot);
}
Ok(map)
}
}
struct Context {
args: BootArgs,
rpc: WsClient,
best: RefCell<BlockDesc>,
bootstrap_service: RefCell<Option<Service>>,
}
impl Context {
async fn new(args: BootArgs) -> anyhow::Result<Self> {
let rpc = args.common.connect_rpc(0).await?;
let VersionedParameters::V1(params) = rpc.parameters().await?;
params.apply().map_err(|s| anyhow!("{}", s))?;
let desc = rpc.best_block().await?;
let s = Self { args, rpc, best: RefCell::new(desc), bootstrap_service: RefCell::new(None) };
Ok(s)
}
fn bootstrap_provider(&self) -> BootstrapProvision {
BootstrapProvision { context: self, preimages: vec![] }
}
async fn bump_best(&self) -> anyhow::Result<BlockDesc> {
let desc = self.rpc.best_block().await?;
*self.best.borrow_mut() = desc;
Ok(desc)
}
fn last(&self) -> BlockDesc {
*self.best.borrow()
}
#[allow(dead_code)]
fn last_slot(&self) -> Slot {
self.last().slot
}
fn last_head(&self) -> HeaderHash {
self.last().header_hash
}
async fn ensure_boot(&self) -> anyhow::Result<Service> {
if let Some(boot) = self.bootstrap_service.borrow().as_ref() {
return Ok(boot.clone())
}
let id = self.args.bootstrap_service_id;
let (boot, maybe_meta, _) = query_service(&self.rpc, self.last_head(), id).await?;
let CodeInfo::Known(m) = maybe_meta else {
return Err(anyhow!("Bootstrap service metadata not found"));
};
let authors = m.authors.join(", ");
println!("Found Bootstrap service at #{id}: {} v{} by {authors}", m.name, m.version);
self.bootstrap_service.replace(Some(boot.clone()));
Ok(boot)
}
async fn submit_items(
&self,
mut items: Vec<WorkItem>,
) -> anyhow::Result<(Slot, WorkPackageHash)> {
let BlockDesc { header_hash: anchor, slot: anchor_slot } = self.last();
let state_root = self.rpc.state_root(anchor).await?;
let beefy_root = self.rpc.beefy_root(anchor).await?;
let fin = self.rpc.finalized_block().await?;
let core = self.args.force_core.unwrap_or(0);
let mut fixed_ref_items = 0;
let fixed_ref_gas = items
.iter()
.map(|i| i.refine_gas_limit)
.filter(|&g| g < max_refine_gas())
.inspect(|_| fixed_ref_items += 1)
.sum::<UnsignedGas>();
let float_ref_items = (items.len() - fixed_ref_items) as UnsignedGas;
if float_ref_items > 0 {
let float_ref_gas = (max_refine_gas() - fixed_ref_gas) / float_ref_items;
for g in items
.iter_mut()
.map(|i| &mut i.refine_gas_limit)
.filter(|g| **g >= max_refine_gas())
{
*g = float_ref_gas;
}
}
let mut fixed_acc_items = 0;
let fixed_acc_gas = items
.iter()
.map(|i| i.accumulate_gas_limit)
.filter(|&g| g < max_accumulate_gas())
.inspect(|_| fixed_acc_items += 1)
.sum::<UnsignedGas>();
let float_acc_items = (items.len() - fixed_acc_items) as UnsignedGas;
if float_acc_items > 0 {
let float_acc_gas = (max_accumulate_gas() - fixed_acc_gas) / float_acc_items;
for g in items
.iter_mut()
.map(|i| &mut i.accumulate_gas_limit)
.filter(|g| **g >= max_accumulate_gas())
{
*g = float_acc_gas;
}
}
let package = WorkPackage {
authorization: Authorization::new(),
auth_code_host: self.args.bootstrap_service_id,
authorizer: Authorizer {
code_hash: hash_raw(jam_null_authorizer_bin::BLOB).into(),
config: AuthConfig::new(),
},
context: RefineContext {
anchor,
state_root,
beefy_root,
lookup_anchor: fin.header_hash,
lookup_anchor_slot: fin.slot,
prerequisites: VecSet::new(),
},
items: items.try_into().map_err(|_| anyhow!("Queue too large"))?,
};
let xts = vec![];
let encoded = package.encode();
let package_hash = hash_raw(&encoded).into();
self.rpc.submit_work_package(core, encoded, xts).await?;
Ok((anchor_slot, package_hash))
}
async fn submit_item(&self, item: WorkItem) -> anyhow::Result<(Slot, WorkPackageHash)> {
self.submit_items(vec![item]).await
}
async fn query_service(
&self,
id: ServiceId,
) -> anyhow::Result<(Service, CodeInfo<CrateInfo>, Option<CodeInfo<CrateInfo>>)> {
query_service(&self.rpc, self.last_head(), id).await
}
fn with_persistent<T: Codec, R>(
&self,
name: &str,
f: impl FnOnce(&mut Option<T>) -> R,
) -> anyhow::Result<R> {
let dirs = directories::ProjectDirs::from("io", "parity", "jam")
.ok_or(anyhow!("This platform has no place for persistent data"))?;
let dir = dirs.cache_dir();
if !dir.exists() {
std::fs::create_dir_all(dir)?;
}
let path = dir.join(name);
let mut state =
if path.exists() { Some(T::decode(&mut &std::fs::read(&path)?[..])?) } else { None };
let r = f(&mut state);
if let Some(s) = state {
std::fs::write(path, s.encode())?;
} else if path.exists() {
std::fs::remove_file(path)?;
}
Ok(r)
}
fn queue_item(&self, item: WorkItem) -> anyhow::Result<usize> {
self.with_persistent::<QueueState, usize>("queue", |queue| {
let mut q = queue.take().unwrap_or_default();
q.items.push(item);
let r = q.items.len();
*queue = Some(q);
r
})
}
fn peek_queue(&self) -> anyhow::Result<Vec<WorkItem>> {
self.with_persistent::<QueueState, _>("queue", |queue| {
queue.as_ref().map(|q| q.items.clone()).unwrap_or_default()
})
}
fn take_queue(&self) -> anyhow::Result<Vec<WorkItem>> {
self.with_persistent::<QueueState, _>("queue", |queue| {
queue.take().unwrap_or_default().items
})
}
async fn execute(self) -> anyhow::Result<()> {
match self.args.command.clone() {
Commands::CreateService {
code,
amount,
memo,
min_item_gas,
min_memo_gas,
register,
} => {
println!("Waiting for network status...");
jam_tooling::wait_for_sync(&self.rpc).await?;
self.bump_best().await?;
let mut was_provided = self
.rpc
.service_preimage(self.last_head(), self.args.bootstrap_service_id, code.hash())
.await?
.is_some();
if let Some(d) = code.data() {
if let Ok((_, Metadata::Info(i))) =
<(Compact<u32>, Metadata)>::decode(&mut &d[..])
{
let authors = i.authors.join(", ");
println!("Code identifies as {} v{} by {}", i.name, i.version, authors);
}
if self.args.provision == Provision::Bootstrap && !was_provided {
println!("Providing service code to Bootstrap service...");
self.boot_ensure_provided(d).await?;
was_provided = true;
}
}
let code_hash = code.hash().into();
let boot = self.ensure_boot().await?;
let inst = Instruction::CreateService {
code_hash,
code_len: code.len() as u64,
min_item_gas,
min_memo_gas,
endowment: amount,
memo,
registration: register.map(|r| r.into_bytes()),
};
let item = WorkItem {
service: self.args.bootstrap_service_id,
code_hash: boot.code_hash,
payload: Instructions(vec![inst]).into(),
refine_gas_limit: max_refine_gas(),
accumulate_gas_limit: max_accumulate_gas(),
import_segments: BoundedVec::new(),
extrinsics: vec![],
export_count: 0,
};
if self.args.queue {
let items = self.queue_item(item)?;
println!("Item queued; queue now contains {items} items.");
println!("NOTE: No code provision will be made.");
return Ok(())
}
self.submit_item(item).await?;
let id = self.wait_for_the_service_to_be_created().await?;
if self.rpc.service_preimage(self.last_head(), id, code_hash.0).await?.is_some() {
was_provided = true;
} else if self.args.provision == Provision::Direct {
if let Some(code_data) = code.into_data() {
let at = self.provide(id, code_data).await?;
println!("Service code provided at slot {at}");
was_provided = true;
}
}
if !was_provided {
println!("Preimage of code hash {} must be provided!", code_hash);
return Ok(())
}
},
Commands::Transfer { service_id, amount, memo, gas } => {
let (service, maybe_meta, _) = self.query_service(service_id).await?;
if let CodeInfo::Known(meta) = maybe_meta {
println!(
"Submitting to service identifying as {} v{} by {}",
meta.name,
meta.version,
meta.authors.join(", ")
);
}
let boot = self.ensure_boot().await?;
let inst = Instruction::Transfer {
destination: service_id,
amount,
gas_limit: gas.unwrap_or(service.min_memo_gas),
memo,
};
let item = WorkItem {
service: self.args.bootstrap_service_id,
code_hash: boot.code_hash,
payload: Instructions(vec![inst]).into(),
refine_gas_limit: max_refine_gas(),
accumulate_gas_limit: max_accumulate_gas(),
import_segments: BoundedVec::new(),
extrinsics: vec![],
export_count: 0,
};
if self.args.queue {
let items = self.queue_item(item)?;
println!("Item queued; queue now contains {items} items.");
return Ok(())
}
let (at, package_hash) = self.submit_item(item).await?;
println!("Item submitted in package {package_hash} with anchor at #{at}");
},
Commands::Item { service_id, payload, refine_gas, accumulate_gas, exports } => {
let (service, maybe_meta, _) = self.query_service(service_id).await?;
if let CodeInfo::Known(meta) = maybe_meta {
println!(
"Submitting to service identifying as {} v{} by {}",
meta.name,
meta.version,
meta.authors.join(", ")
);
}
let item = WorkItem {
service: service_id,
code_hash: service.code_hash,
payload: payload.into(),
refine_gas_limit: refine_gas.unwrap_or(max_refine_gas()),
accumulate_gas_limit: accumulate_gas.unwrap_or(max_accumulate_gas()),
import_segments: BoundedVec::new(),
extrinsics: vec![],
export_count: exports,
};
if self.args.queue {
let items = self.queue_item(item)?;
println!("Item queued; queue now contains {items} items.");
return Ok(())
}
let (at, package_hash) = self.submit_item(item).await?;
println!("Item submitted in package {package_hash} with anchor at #{at}");
},
Commands::Pack => {
let items = self.take_queue()?;
if items.is_empty() {
println!("No items in queue.");
return Ok(())
}
let count = items.len();
let (at, package_hash) = self.submit_items(items).await?;
println!(
"{count} item(s) submitted in package {package_hash} with anchor at #{at}"
);
},
Commands::Queue => match self.peek_queue()?.len() {
0 => println!("Queue is empty"),
1 => println!("Queue has 1 item"),
x => println!("Queue has {x} items"),
},
Commands::Inspect { id, field } => {
let desc = match id {
BlockId::Best => self.last(),
BlockId::Final => self.rpc.finalized_block().await?,
};
match field {
Inspections::Slot => println!("{}", desc.slot),
Inspections::Service { id } => {
let (_service, maybe_meta, maybe_inner_meta) =
self.query_service(id).await?;
if let CodeInfo::Known(meta) = maybe_meta {
println!(
"Service {} v{} by {}",
meta.name,
meta.version,
meta.authors.join(", ")
);
} else {
println!("Service code not found");
}
match maybe_inner_meta {
None => {},
Some(CodeInfo::Undefined(hash)) => {
println!("Guest code {hash} is defined but unreadable");
},
Some(CodeInfo::CodeNotProvided(hash)) => {
println!("Guest code {hash} is not yet provided");
},
Some(CodeInfo::Known(meta)) => {
println!(
"Guest {} v{} by {}",
meta.name,
meta.version,
meta.authors.join(", ")
);
},
}
},
Inspections::Storage { id, key } => {
let value =
self.rpc.service_value(desc.header_hash, id, key.clone()).await?;
if let Some(value) = value {
println!("{}", AnyVec::from(value));
} else {
return Err(anyhow!("No value found for key {:?}", key));
}
},
Inspections::Request { id, key } => {
let hash = key.hash();
let len = key.len() as u32;
let maybe_request =
self.rpc.service_request(desc.header_hash, id, hash, len).await?;
let Some(request) = maybe_request else {
return Err(anyhow!(
"No preimage request found for hash {:?}",
hash.any()
));
};
match request.len() {
0 => {
println!("Unprovided request found");
},
1 => {
println!("Request active, provided at slot {}", request[0]);
},
2 => {
println!(
"Request inactive since {}, provided at slot {}",
request[1], request[0]
);
},
3 => {
println!("Request active since {}, unrequested at slot {}, provided at slot {}", request[2], request[1], request[0]);
},
_ => unreachable!(),
}
},
Inspections::Preimage { id, key } => {
let hash = key.hash();
let maybe_data =
self.rpc.service_preimage(desc.header_hash, id, hash).await?;
let Some(data) = maybe_data else {
return Err(anyhow!("No preimage provided for hash {:?}", hash.any()));
};
println!("Preimage found of length {}", data.len());
},
}
},
Commands::Provide { requester: service_id, preimage: pre_image } => {
let at = self.provide(service_id, pre_image).await?;
println!("Preimage provided at slot {at}");
},
Commands::Vm { command } => match command {
VmCommands::New { code, amount, min_item_gas, min_memo_gas } => {
if let Ok((_, Metadata::Info(i))) =
<(Compact<u32>, Metadata)>::decode(&mut &corevm_bin::BLOB[..])
{
let authors = i.authors.join(", ");
println!("Using CoreVM module {} v{} by {}", i.name, i.version, authors);
}
if let Ok((_, Metadata::Info(i))) =
<(Compact<u32>, Metadata)>::decode(&mut &code[..])
{
let authors = i.authors.join(", ");
println!("Guest identifies as {} v{} by {}", i.name, i.version, authors);
} else {
println!("Guest code metadata not found");
}
let chunked = corevm_host::File::new(&code);
if self.args.provision == Provision::Bootstrap {
let mut provider = self.bootstrap_provider();
if provider.ensure(corevm_bin::BLOB).await?.0 {
println!("Providing CoreVM module code...");
} else {
println!("CoreVM module already provided.");
}
let blocks = chunked.len();
for (i, (_, block_data)) in chunked.iter().enumerate() {
if provider.ensure(block_data).await?.0 {
println!("Providing guest block {}/{blocks}...", i + 1);
} else {
println!("Guest block {}/{blocks} already provided.", i + 1);
}
}
let m = provider.provide().await?;
println!("Provided {} preimages to Bootstrap service", m.len());
}
let boot = self.ensure_boot().await?;
let mut memo = Memo::zero();
CoreVmInstruction::SetCode {
code_hash: chunked.first_hash().into(),
host: self.args.bootstrap_service_id,
}
.encode_to(&mut &mut memo[..]);
let inst = Instruction::CreateService {
code_hash: hash_raw(corevm_bin::BLOB).into(),
code_len: corevm_bin::BLOB.len() as u64,
min_item_gas,
min_memo_gas,
endowment: amount,
memo,
registration: None,
};
let item = WorkItem {
service: self.args.bootstrap_service_id,
code_hash: boot.code_hash,
payload: Instructions(vec![inst]).into(),
refine_gas_limit: max_refine_gas(),
accumulate_gas_limit: max_accumulate_gas(),
import_segments: BoundedVec::new(),
extrinsics: vec![],
export_count: 0,
};
if self.args.queue {
let items = self.queue_item(item)?;
println!("Item queued; queue now contains {items} items.");
return Ok(())
}
self.submit_item(item).await?;
let id = self.wait_for_the_service_to_be_created().await?;
println!("Service id: {id:x}");
},
VmCommands::Upgrade { service_id, code } => {
let (service, maybe_meta, maybe_inner) = self.query_service(service_id).await?;
if let CodeInfo::Known(meta) = maybe_meta {
if meta.name != "corevm" {
return Err(anyhow!("Service does not appears to be CoreVM"))
}
println!(
"Submitting to CoreVM service identifying as {} v{} by {}",
meta.name,
meta.version,
meta.authors.join(", ")
);
if let Some(CodeInfo::Known(inner)) = maybe_inner {
println!(
"Current guest is {} v{} by {}",
inner.name,
inner.version,
inner.authors.join(", ")
);
} else {
println!("Current guest code metadata not found");
}
}
let chunked = corevm_host::File::new(&code);
if self.args.provision == Provision::Bootstrap {
let mut provider = self.bootstrap_provider();
let blocks = chunked.len();
for (i, (_, block_data)) in chunked.iter().enumerate() {
if provider.ensure(block_data).await?.0 {
println!("Providing guest block {}/{blocks}...", i + 1);
} else {
println!("Guest block {}/{blocks} already provided.", i + 1);
}
}
let m = provider.provide().await?;
println!("Provided {} preimages to Bootstrap service", m.len());
}
let boot = self.ensure_boot().await?;
let mut memo = Memo::zero();
CoreVmInstruction::SetCode {
code_hash: chunked.first_hash().into(),
host: self.args.bootstrap_service_id,
}
.encode_to(&mut &mut memo[..]);
let inst = Instruction::Transfer {
destination: service_id,
amount: 0,
gas_limit: service.min_memo_gas,
memo,
};
let item = WorkItem {
service: self.args.bootstrap_service_id,
code_hash: boot.code_hash,
payload: Instructions(vec![inst]).into(),
refine_gas_limit: max_refine_gas(),
accumulate_gas_limit: max_accumulate_gas(),
import_segments: BoundedVec::new(),
extrinsics: vec![],
export_count: 0,
};
if self.args.queue {
let items = self.queue_item(item)?;
println!("Item queued; queue now contains {items} items.");
return Ok(())
}
self.submit_item(item).await?;
},
},
}
Ok(())
}
async fn boot_ensure_provided(&self, data: &[u8]) -> anyhow::Result<Slot> {
let hash = hash_raw(data);
let len = data.len() as u32;
let mut sub = self
.rpc
.subscribe_service_request(self.args.bootstrap_service_id, hash, len, false)
.await?;
let slot = loop {
let StateUpdate { value, .. } = sub
.next()
.await
.ok_or(anyhow!("Subscription to monitor creation terminated unexpectedly"))??;
match value.map(|v| (v.len(), v)) {
None => {
self.boot_solicit([(hash, len as usize)]).await?;
},
Some((0, _)) => {
self.rpc
.submit_preimage(self.args.bootstrap_service_id, data.to_owned())
.await?;
},
Some((2, _)) => {
self.boot_solicit([(hash, len as usize)]).await?;
},
Some((1, v)) | Some((3, v)) => {
break *v.last().expect("Already matched length 1 or 3; qed");
},
_ => unreachable!(),
}
};
Ok(slot)
}
async fn boot_solicit(
&self,
items: impl IntoIterator<Item = (Hash, usize)>,
) -> anyhow::Result<()> {
let boot = self.ensure_boot().await?;
let insts = items
.into_iter()
.map(|(hash, len)| Instruction::Solicit { hash: AnyHash::from(hash), len: len as u64 })
.collect::<Vec<_>>();
let count = insts.len();
let item = WorkItem {
service: self.args.bootstrap_service_id,
code_hash: boot.code_hash,
payload: Instructions(insts).into(),
refine_gas_limit: max_refine_gas(),
accumulate_gas_limit: max_accumulate_gas(),
import_segments: BoundedVec::new(),
extrinsics: vec![],
export_count: 0,
};
let (at, package_hash) = self.submit_item(item).await?;
println!("Solicited {count} items in package {package_hash} with anchor at #{at}");
Ok(())
}
#[allow(dead_code)]
async fn provide(&self, id: ServiceId, data: Vec<u8>) -> anyhow::Result<Slot> {
let hash = hash_raw(&data);
let len = data.len() as u32;
let mut sub = self.rpc.subscribe_service_request(id, hash, len, false).await?;
let r = loop {
let StateUpdate { value, .. } = sub.next().await.ok_or(anyhow!(
"Subscription to monitor service request terminated unexpectedly"
))??;
match value.map(|v| (v.len(), v)) {
None => break Err(anyhow!("Service request disappeared!")),
Some((0, _)) => {
self.rpc.submit_preimage(id, data.clone()).await?;
println!("Preimage submitted. Waiting...");
},
Some((1, v)) => break Ok(v[0]),
Some((_, v)) => {
println!(
"Preimage already provided at slot {} and then unrequested at slot {}!",
v[0], v[1]
);
break Err(anyhow!("Service unrequested the preimage!"));
},
}
};
sub.unsubscribe().await?;
r
}
async fn wait_for_the_service_to_be_created(&self) -> anyhow::Result<ServiceId> {
let mut create_sub = self
.rpc
.subscribe_service_value(self.args.bootstrap_service_id, b"created".to_vec(), false)
.await?;
let mut maybe_prior = None;
let id = loop {
let StateUpdate { slot, value, .. } = create_sub
.next()
.await
.ok_or(anyhow!("Subscription to monitor creation terminated unexpectedly"))??;
if let Some(prior) = maybe_prior.as_ref() {
if prior != &value {
if let Some(ref b) = value {
let id = ServiceId::from_le_bytes([b[0], b[1], b[2], b[3]]);
println!("Service {id:08x} created at slot {slot}");
break id;
}
}
println!("Creation not detected at slot {slot}.");
} else {
println!("Work package submitted at slot {slot}. Monitoring...");
maybe_prior = Some(value)
}
};
create_sub.unsubscribe().await?;
Ok(id)
}
}
pub async fn boot_main() -> anyhow::Result<()> {
Context::new(BootArgs::parse()).await?.execute().await
}