use anyhow::anyhow;
use clap::Parser;
use futures::future::try_join_all;
use jam_bootstrap_service_common::{Instruction, Instructions};
use jam_program_blob::{ConventionalMetadata as Metadata, CrateInfo};
use jam_std_common::{
hash_raw, max_accumulate_gas, max_refine_gas, RpcClient as _, Service, StateUpdate,
VersionedParameters,
};
use jam_tooling::{
parsing::{self, Blob, DataIdLen},
query_service, CodeInfo, CommonArgs, Error,
};
use jam_types::{
val_count, AnyVec, AuthParam, Authorization, Authorizer, Balance, BoundedVec, CodeHash,
CoreIndex, HeaderHash, Memo, RefineContext, ServiceId, Slot, UnsignedGas, VecSet, WorkItem,
WorkPackage, WorkPackageHash,
};
use jsonrpsee::ws_client::WsClient;
use scale::{Codec, Compact, Decode, Encode};
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, clap::ValueEnum)]
pub enum BlockId {
Best,
Final,
}
#[derive(clap::Subcommand, Debug, Clone)]
pub enum Inspections {
Slot,
Storage {
#[arg(value_parser = parsing::service_id)]
id: ServiceId,
#[arg(value_parser = parsing::blob)]
key: Blob,
},
}
#[derive(clap::Subcommand, Debug, Clone)]
pub enum Commands {
CreateService {
#[arg(value_parser = parsing::data_id_len)]
code: DataIdLen,
#[arg(default_value_t = 0)]
amount: Balance,
#[arg(value_parser = parsing::memo, default_value = "")]
memo: Memo,
#[arg(short = 'G', long, default_value_t = 1_000_000)]
min_item_gas: UnsignedGas,
#[arg(short = 'g', long, default_value_t = 1_000_000)]
min_memo_gas: UnsignedGas,
},
Item {
#[arg(value_parser = parsing::service_id)]
service_id: ServiceId,
#[arg(value_parser = parsing::blob, default_value = "")]
payload: Blob,
#[arg(short = 'G', long)]
refine_gas: Option<UnsignedGas>,
#[arg(short = 'g', long)]
accumulate_gas: Option<UnsignedGas>,
#[arg(short = 'e', long, default_value_t = 0)]
exports: u16,
},
Transfer {
#[arg(value_parser = parsing::service_id)]
service_id: ServiceId,
#[arg(default_value_t = 0)]
amount: Balance,
#[arg(value_parser = parsing::memo, default_value = "")]
memo: Memo,
#[arg(short = 'g', long, value_parser = parsing::gas)]
gas: Option<UnsignedGas>,
},
Inspect {
#[arg(default_value = "best")]
id: BlockId,
#[command(subcommand)]
field: Inspections,
},
Queue,
Pack,
}
#[derive(Parser, Debug)]
#[command(version)]
#[clap(name = "jam")]
pub struct BootArgs {
#[command(flatten)]
common: CommonArgs,
#[arg(short = 'b', long, default_value_t = 0)]
bootstrap_service_id: u32,
#[arg(short = 'p', long, action, default_value_t = false)]
no_auto_provide: bool,
#[arg(short = 'c', long)]
force_core: Option<CoreIndex>,
#[arg(short = 'q', long, default_value_t = false)]
queue: bool,
#[command(subcommand)]
command: Commands,
}
#[derive(Encode, Decode, Debug, Default)]
struct QueueState {
items: Vec<WorkItem>,
}
struct Context {
args: BootArgs,
rpcs: Vec<WsClient>,
best: std::cell::RefCell<(HeaderHash, Slot)>,
}
impl Context {
async fn new(args: BootArgs) -> anyhow::Result<Self> {
let mut rpcs = vec![args.common.connect_rpc(0).await?];
let VersionedParameters::V1(params) = rpcs[0].parameters().await?;
params.apply().map_err(|s| anyhow!("{}", s))?;
rpcs.extend(try_join_all((1..val_count()).map(|i| args.common.connect_rpc(i))).await?);
let (head, slot) = rpcs[0].best_block().await?;
let s = Self { args, rpcs, best: std::cell::RefCell::new((head.into(), slot)) };
Ok(s)
}
fn rpc(&self) -> &WsClient {
self.rpcs.first().expect("one node guaranteed")
}
#[allow(dead_code)]
async fn bump_best(&self) -> anyhow::Result<(HeaderHash, Slot)> {
let (hash, slot) = self.rpc().best_block().await?;
*self.best.borrow_mut() = (hash.into(), slot);
Ok((hash.into(), slot))
}
fn last(&self) -> (HeaderHash, Slot) {
*self.best.borrow()
}
#[allow(dead_code)]
fn last_slot(&self) -> Slot {
self.last().1
}
fn last_head(&self) -> HeaderHash {
self.last().0
}
async fn ensure_boot(&self) -> anyhow::Result<Service> {
let id = self.args.bootstrap_service_id;
let (boot, maybe_meta) = query_service(self.rpc(), self.last_head().0, id).await?;
let CodeInfo::Known(m) = maybe_meta else {
return Err(anyhow!("Bootstrap service metadata not found"));
};
let authors = m.authors.join(", ");
println!("Bootstrap service #{id} identifies as {} v{} by {authors}", m.name, m.version);
Ok(boot)
}
async fn submit_items(
&self,
mut items: Vec<WorkItem>,
) -> anyhow::Result<(Slot, WorkPackageHash)> {
let (anchor, anchor_slot) = self.last();
let state_root = self.rpc().state_root(anchor.0).await?.ok_or(Error::StateRootNotKnown)?;
let beefy_root = self.rpc().beefy_root(anchor.0).await?.ok_or(Error::BeefyRootNotKnown)?;
let (fin_hash, fin_slot) = self.rpc().finalized_block().await?;
let core = self.args.force_core.unwrap_or(0);
let mut fixed_ref_items = 0;
let fixed_ref_gas = items
.iter()
.map(|i| i.refine_gas_limit)
.filter(|&g| g < max_refine_gas())
.inspect(|_| fixed_ref_items += 1)
.sum::<UnsignedGas>();
let float_ref_items = (items.len() - fixed_ref_items) as UnsignedGas;
if float_ref_items > 0 {
let float_ref_gas = (max_refine_gas() - fixed_ref_gas) / float_ref_items;
for g in items
.iter_mut()
.map(|i| &mut i.refine_gas_limit)
.filter(|g| **g >= max_refine_gas())
{
*g = float_ref_gas;
}
}
let mut fixed_acc_items = 0;
let fixed_acc_gas = items
.iter()
.map(|i| i.accumulate_gas_limit)
.filter(|&g| g < max_accumulate_gas())
.inspect(|_| fixed_acc_items += 1)
.sum::<UnsignedGas>();
let float_acc_items = (items.len() - fixed_acc_items) as UnsignedGas;
if float_acc_items > 0 {
let float_acc_gas = (max_accumulate_gas() - fixed_acc_gas) / float_acc_items;
for g in items
.iter_mut()
.map(|i| &mut i.accumulate_gas_limit)
.filter(|g| **g >= max_accumulate_gas())
{
*g = float_acc_gas;
}
}
let package = WorkPackage {
authorization: Authorization::new(),
auth_code_host: self.args.bootstrap_service_id,
authorizer: Authorizer {
code_hash: hash_raw(null_authorizer_bin::BLOB).into(),
param: AuthParam::new(),
},
context: RefineContext {
anchor,
state_root: state_root.into(),
beefy_root: beefy_root.into(),
lookup_anchor: fin_hash.into(),
lookup_anchor_slot: fin_slot,
prerequisites: VecSet::new(),
},
items: items.try_into().map_err(|_| anyhow!("Queue too large"))?,
};
let xts = vec![];
let encoded = package.encode();
let package_hash = hash_raw(&encoded).into();
self.rpc().submit_work_package(core, encoded, xts).await?;
Ok((anchor_slot, package_hash))
}
async fn submit_item(&self, item: WorkItem) -> anyhow::Result<(Slot, WorkPackageHash)> {
self.submit_items(vec![item]).await
}
async fn query_service(&self, id: ServiceId) -> anyhow::Result<(Service, CodeInfo<CrateInfo>)> {
query_service(self.rpc(), self.last_head().0, id).await
}
fn with_persistent<T: Codec, R>(
&self,
name: &str,
f: impl FnOnce(&mut Option<T>) -> R,
) -> anyhow::Result<R> {
let dirs = directories::ProjectDirs::from("io", "parity", "jam")
.ok_or(anyhow!("This platform has no place for persistent data"))?;
let dir = dirs.cache_dir();
if !dir.exists() {
std::fs::create_dir_all(dir)?;
}
let path = dir.join(name);
let mut state =
if path.exists() { Some(T::decode(&mut &std::fs::read(&path)?[..])?) } else { None };
let r = f(&mut state);
if let Some(s) = state {
std::fs::write(path, s.encode())?;
} else if path.exists() {
std::fs::remove_file(path)?;
}
Ok(r)
}
fn queue_item(&self, item: WorkItem) -> anyhow::Result<usize> {
self.with_persistent::<QueueState, usize>("queue", |queue| {
let mut q = queue.take().unwrap_or_default();
q.items.push(item);
let r = q.items.len();
*queue = Some(q);
r
})
}
fn peek_queue(&self) -> anyhow::Result<Vec<WorkItem>> {
self.with_persistent::<QueueState, _>("queue", |queue| {
queue.as_ref().map(|q| q.items.clone()).unwrap_or_default()
})
}
fn take_queue(&self) -> anyhow::Result<Vec<WorkItem>> {
self.with_persistent::<QueueState, _>("queue", |queue| {
queue.take().unwrap_or_default().items
})
}
async fn execute(self) -> anyhow::Result<()> {
match self.args.command.clone() {
Commands::CreateService { code, amount, memo, min_item_gas, min_memo_gas } => {
if let Some(mut d) = code.data() {
if let Ok((_, Metadata::Info(i))) = <(Compact<u32>, Metadata)>::decode(&mut d) {
let authors = i.authors.join(", ");
println!("Code identifies as {} v{} by {}", i.name, i.version, authors);
}
}
let boot = self.ensure_boot().await?;
let inst = Instruction::CreateService {
code_hash: code.hash().into(),
code_len: code.len() as u64,
min_item_gas,
min_memo_gas,
endowment: amount,
memo,
};
let item = WorkItem {
service: self.args.bootstrap_service_id,
code_hash: boot.code_hash,
payload: Instructions(vec![inst]).into(),
refine_gas_limit: max_refine_gas(),
accumulate_gas_limit: max_accumulate_gas(),
import_segments: BoundedVec::new(),
extrinsics: vec![],
export_count: 0,
};
if self.args.queue {
let items = self.queue_item(item)?;
println!("Item queued; queue now contains {items} items.");
println!("NOTE: No code provision will be made.");
return Ok(())
}
self.submit_item(item).await?;
let mut create_sub = self
.rpc()
.subscribe_service_value(
self.args.bootstrap_service_id,
b"created".to_vec(),
false,
)
.await?;
let mut maybe_prior = None;
let (id, cre_header_hash, cre_slot) = loop {
let StateUpdate { header_hash, slot, value } = create_sub.next().await.ok_or(
anyhow!("Subscription to monitor creation terminated unexpectedly"),
)??;
if let Some(prior) = maybe_prior.as_ref() {
if prior != &value {
if let Some(ref b) = value {
let id = ServiceId::from_le_bytes([b[0], b[1], b[2], b[3]]);
println!("Service {id:08x} created at slot {slot}");
break (id, header_hash, slot);
}
}
println!("Creation not detected at slot {slot}.");
} else {
println!("Work package submitted at slot {slot}. Monitoring...");
maybe_prior = Some(value)
}
};
create_sub.unsubscribe().await?;
if self.args.no_auto_provide {
println!("Preimage of code hash {} must be provided!", CodeHash(code.hash()));
return Ok(())
}
let Some(code_data) = code.data() else {
println!(
"Unable to provide preimage of code hash {} - code unknown.",
CodeHash(code.hash())
);
return Ok(())
};
println!("Providing service code...");
let at = self.provide(id, code_data, cre_header_hash.into(), cre_slot).await?;
println!("Service code provided at slot {at}");
},
Commands::Transfer { service_id, amount, memo, gas } => {
let (service, maybe_meta) = self.query_service(service_id).await?;
if let CodeInfo::Known(meta) = maybe_meta {
println!(
"Submitting to service identifying as {} v{} by {}",
meta.name,
meta.version,
meta.authors.join(", ")
);
}
let boot = self.ensure_boot().await?;
let inst = Instruction::Transfer {
destination: service_id,
amount,
gas_limit: gas.unwrap_or(service.min_memo_gas),
memo,
};
let item = WorkItem {
service: self.args.bootstrap_service_id,
code_hash: boot.code_hash,
payload: Instructions(vec![inst]).into(),
refine_gas_limit: max_refine_gas(),
accumulate_gas_limit: max_accumulate_gas(),
import_segments: BoundedVec::new(),
extrinsics: vec![],
export_count: 0,
};
if self.args.queue {
let items = self.queue_item(item)?;
println!("Item queued; queue now contains {items} items.");
return Ok(())
}
let (at, package_hash) = self.submit_item(item).await?;
println!("Item submitted in package {package_hash} with anchor at #{at}");
},
Commands::Item { service_id, payload, refine_gas, accumulate_gas, exports } => {
let (service, maybe_meta) = self.query_service(service_id).await?;
if let CodeInfo::Known(meta) = maybe_meta {
println!(
"Submitting to service identifying as {} v{} by {}",
meta.name,
meta.version,
meta.authors.join(", ")
);
}
let item = WorkItem {
service: service_id,
code_hash: service.code_hash,
payload: payload.into(),
refine_gas_limit: refine_gas.unwrap_or(max_refine_gas()),
accumulate_gas_limit: accumulate_gas.unwrap_or(max_accumulate_gas()),
import_segments: BoundedVec::new(),
extrinsics: vec![],
export_count: exports,
};
if self.args.queue {
let items = self.queue_item(item)?;
println!("Item queued; queue now contains {items} items.");
return Ok(())
}
let (at, package_hash) = self.submit_item(item).await?;
println!("Item submitted in package {package_hash} with anchor at #{at}");
},
Commands::Pack => {
let items = self.take_queue()?;
if items.is_empty() {
println!("No items in queue.");
return Ok(())
}
let count = items.len();
let (at, package_hash) = self.submit_items(items).await?;
println!(
"{count} item(s) submitted in package {package_hash} with anchor at #{at}"
);
},
Commands::Queue => match self.peek_queue()?.len() {
0 => println!("Queue is empty"),
1 => println!("Queue has 1 item"),
x => println!("Queue has {x} items"),
},
Commands::Inspect { id, field } => {
let (hash, slot) = match id {
BlockId::Best => self.last(),
BlockId::Final =>
self.rpc().finalized_block().await.map(|x| (x.0.into(), x.1))?,
};
match field {
Inspections::Slot => println!("{slot}"),
Inspections::Storage { id, key } => {
let value = self.rpc().service_value(*hash, id, key.clone()).await?;
if let Some(value) = value {
println!("{}", AnyVec::from(value));
} else {
return Err(anyhow!("No value found for key {:?}", key));
}
},
}
},
}
Ok(())
}
async fn provide(
&self,
id: ServiceId,
data: &[u8],
req: HeaderHash,
req_from: Slot,
) -> anyhow::Result<Slot> {
let hash = hash_raw(data);
let len = data.len() as u32;
Ok(try_join_all(self.rpcs.iter().enumerate().map(|(i, rpc)| async move {
let mut sub = rpc.subscribe_service_request(id, hash, len, false).await?;
let r = loop {
let StateUpdate { slot, value, .. } = sub.next().await
.ok_or(anyhow!("Subscription to monitor service request terminated unexpectedly"))??;
match value.map(|v| (v.len(), v)) {
None if slot >= req_from => {
break Err(anyhow!("Service request disappeared!"));
},
None => {}, Some((0, _)) => {
rpc.submit_preimage(id, data.to_owned(), req.0).await?;
if i > 0 {
break Ok(slot);
}
println!("Submitted. Waiting for provision...");
},
Some((1, v)) if i == 0 => {
break Ok(v[0]);
},
_ if i > 0 => {
break Ok(slot);
},
Some((_, v)) => {
println!("Service code already provided at slot {} and then unrequested at slot {}!", v[0], v[1]);
break Err(anyhow!("Service unrequested its own code!"));
},
}
};
sub.unsubscribe().await?;
r
})).await?[0])
}
}
pub async fn boot_main() -> anyhow::Result<()> {
Context::new(BootArgs::parse()).await?.execute().await
}