use super::{formatting::print_service_info, queue, BootstrapService, PackageOptions};
use anyhow::anyhow;
use codec::Encode;
use corevm_tooling::ArcNodeExt;
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use jam_bootstrap_service_common::Instruction;
use jam_std_common::{hash_raw, Node, Service, WorkPackageStatus};
use jam_tooling::{parsing::Blob, NodeExt as _};
use jam_types::{
core_count, max_accumulate_gas, max_refine_gas, max_work_items, AuthConfig, Authorization,
Authorizer, Balance, BoundedVec, CoreIndex, HeaderHash, ImportSpec, Memo, RefineContext,
ServiceId, UnsignedGas, WorkItem, WorkItems, WorkPackage, WorkPackageHash, WorkReportHash,
};
use std::{future::Future, sync::Arc};
pub fn build_work_package(
auth_code_host: ServiceId,
context: RefineContext,
items: WorkItems,
) -> WorkPackage {
WorkPackage {
authorization: Authorization::new(),
auth_code_host,
authorizer: Authorizer {
code_hash: jam_null_authorizer_bin::HASH.into(),
config: AuthConfig::new(),
},
context,
items,
}
}
pub async fn wait_until_ready(
node: &dyn Node,
anchor: HeaderHash,
package_hash: WorkPackageHash,
) -> anyhow::Result<WorkReportHash> {
let mut sub = node.subscribe_work_package_status(package_hash, anchor, false).await?;
loop {
match sub
.next()
.await
.ok_or(anyhow!("Package {package_hash} status subscription unexpectedly ended"))??
.value
{
WorkPackageStatus::Reportable { .. } | WorkPackageStatus::Reported { .. } => {},
WorkPackageStatus::Ready { report_hash, .. } => break Ok(report_hash),
WorkPackageStatus::Failed(message) =>
break Err(anyhow!("Package {package_hash} failed: {message}")),
}
}
}
pub async fn wait_until_all_ready(
mut tasks: FuturesUnordered<
impl Future<Output = (anyhow::Result<WorkReportHash>, WorkPackageHash)>,
>,
) -> anyhow::Result<()> {
let mut some_failed = false;
while let Some((result, package_hash)) = tasks.next().await {
match result {
Ok(report_hash) =>
eprintln!("Package {package_hash} finished; report hash is {report_hash}"),
Err(e) => {
some_failed = true;
eprintln!("{e}");
},
}
}
if some_failed {
return Err(anyhow!("Some packages failed"));
}
Ok(())
}
pub fn adjust_gas(items: &mut WorkItems) -> anyhow::Result<()> {
if items.len() <= 1 {
return Ok(());
}
adjust_refine_gas(items)?;
adjust_accumulate_gas(items)?;
Ok(())
}
fn adjust_refine_gas(items: &mut WorkItems) -> anyhow::Result<()> {
let max_gas = max_refine_gas();
let (num_adjustable_items, total_fixed_gas) =
gas_stats(items.iter().map(|item| item.refine_gas_limit), max_gas)?;
distribute_gas(
items.iter_mut().map(|item| &mut item.refine_gas_limit),
max_gas,
num_adjustable_items,
total_fixed_gas,
);
Ok(())
}
fn adjust_accumulate_gas(items: &mut WorkItems) -> anyhow::Result<()> {
let max_gas = max_accumulate_gas();
let (num_adjustable_items, total_fixed_gas) =
gas_stats(items.iter().map(|item| item.accumulate_gas_limit), max_gas)?;
distribute_gas(
items.iter_mut().map(|item| &mut item.accumulate_gas_limit),
max_gas,
num_adjustable_items,
total_fixed_gas,
);
Ok(())
}
fn gas_stats(
items: impl IntoIterator<Item = UnsignedGas>,
max_gas: UnsignedGas,
) -> anyhow::Result<(usize, UnsignedGas)> {
let mut num_adjustable_items = 0;
let mut total_fixed_gas = 0_u128;
for gas in items.into_iter() {
if gas == max_gas {
num_adjustable_items += 1;
} else {
total_fixed_gas += u128::from(gas);
}
}
if total_fixed_gas > u128::from(max_gas) {
return Err(anyhow!("Too much gas requested"));
}
Ok((num_adjustable_items, total_fixed_gas as UnsignedGas))
}
fn distribute_gas<'a>(
items: impl ExactSizeIterator<Item = &'a mut UnsignedGas>,
max_gas: UnsignedGas,
num_adjustable_items: usize,
total_fixed_gas: UnsignedGas,
) {
if num_adjustable_items == 0 {
return;
}
let mut remaining_gas = max_gas - total_fixed_gas as UnsignedGas;
let gas_per_adjustable_item = remaining_gas.div_ceil(num_adjustable_items as u64);
for gas in items {
if *gas != max_gas {
continue;
}
*gas = gas_per_adjustable_item.min(remaining_gas);
remaining_gas = remaining_gas.saturating_sub(gas_per_adjustable_item);
}
}
pub struct PackageBuilder<N: Node + ArcNodeExt + Send + Sync + 'static> {
node: Arc<N>,
service_id: ServiceId,
options: PackageOptions,
}
impl<N: Node + ArcNodeExt + Send + Sync + 'static> PackageBuilder<N> {
pub fn new(node: Arc<N>, service_id: ServiceId, options: PackageOptions) -> Self {
Self { node, service_id, options }
}
pub async fn transfer(
&self,
boot: &BootstrapService,
service_id: ServiceId,
amount: Balance,
memo: Memo,
gas: Option<UnsignedGas>,
) -> anyhow::Result<()> {
let service = self.query_service(service_id).await?;
let inst = Instruction::Transfer {
destination: service_id,
amount,
gas_limit: gas.unwrap_or(service.min_memo_gas),
memo,
};
let item = boot.create_work_item(vec![inst]);
if self.options.queue {
let items = queue::push(item)?;
eprintln!("Item queued; queue now contains {items} items.");
return Ok(())
}
let (package_hash, anchor) = boot.submit_work_item(item, self.options.core()).await?;
wait_until_ready(self.node.as_ref(), anchor.header_hash, package_hash).await?;
Ok(())
}
pub async fn item(
&self,
service_id: ServiceId,
payload: Blob,
refine_gas: Option<UnsignedGas>,
accumulate_gas: Option<UnsignedGas>,
exports: u16,
import: Vec<ImportSpec>,
) -> anyhow::Result<()> {
let service = self.query_service(service_id).await?;
let import_segments = import.try_into().map_err(|_| {
anyhow!("Too many imports for workitem, expect at most {}.", jam_types::max_imports(),)
})?;
let item = WorkItem {
service: service_id,
code_hash: service.code_hash,
payload: payload.into(),
refine_gas_limit: refine_gas.unwrap_or(max_refine_gas()),
accumulate_gas_limit: accumulate_gas.unwrap_or(max_accumulate_gas()),
import_segments,
extrinsics: BoundedVec::new(),
export_count: exports,
};
if self.options.queue {
let items = queue::push(item)?;
eprintln!("Item queued; queue now contains {items} items.");
return Ok(())
}
let (context, anchor_slot) = self.node.create_refine_context().await?;
let package_hash = self
.submit_package(
context,
vec![item].try_into().expect("Only one item"),
self.options.core(),
)
.await?;
eprintln!("Item submitted in package {package_hash} with anchor at #{anchor_slot}");
Ok(())
}
pub async fn pack_list(
&self,
service_id: ServiceId,
payloads: Vec<Blob>,
refine_gas: Option<UnsignedGas>,
accumulate_gas: Option<UnsignedGas>,
) -> anyhow::Result<()> {
let service = self.query_service(service_id).await?;
let payloads = if payloads.is_empty() { vec![Blob::default()] } else { payloads };
let start_core = self.options.core();
let items: Vec<WorkItem> = payloads
.into_iter()
.map(|payload| WorkItem {
service: service_id,
code_hash: service.code_hash,
payload: payload.into(),
refine_gas_limit: refine_gas.unwrap_or(max_refine_gas()),
accumulate_gas_limit: accumulate_gas.unwrap_or(max_accumulate_gas()),
import_segments: BoundedVec::new(),
extrinsics: BoundedVec::new(),
export_count: 0,
})
.collect();
let (context, anchor_slot) = self.node.create_refine_context().await?;
let mut submitted = 0;
for (i, item) in items.into_iter().enumerate() {
let core = (start_core + i as CoreIndex) % core_count() as CoreIndex;
if let Err(e) = self
.submit_package(
context.clone(),
vec![item].try_into().expect("Only one item"),
core,
)
.await
{
eprintln!("Error submitting package for core {core}: {e}");
continue;
};
submitted += 1;
}
eprintln!("Submitted {submitted} work package(s) with anchor at #{anchor_slot}");
Ok(())
}
pub async fn pack(&self, boot: &BootstrapService) -> anyhow::Result<()> {
let items = queue::take()?;
if items.is_empty() {
eprintln!("No items in queue.");
return Ok(())
}
let count = items.len();
let core = self.options.core();
let packages = FuturesUnordered::new();
for some_items in items.chunks(max_work_items()) {
let some_items = some_items
.to_vec()
.try_into()
.expect("Chunk size is bounded by max_work_items()");
let (package_hash, anchor) = boot.submit_work_items(some_items, core).await?;
eprintln!(
"{count} item(s) submitted in package {package_hash} with anchor at #{}",
anchor.slot
);
packages.push(
wait_until_ready(self.node.as_ref(), anchor.header_hash, package_hash)
.map(move |result| (result, package_hash)),
);
}
wait_until_all_ready(packages).await
}
async fn query_service(&self, service_id: ServiceId) -> anyhow::Result<Service> {
let best = self.node.best_block().await?;
let (service, outer, maybe_inner) =
self.node.query_service_and_corevm_guest(best.header_hash, service_id).await?;
print_service_info(outer.as_ref(), maybe_inner.as_ref());
Ok(service)
}
async fn submit_package(
&self,
context: RefineContext,
items: WorkItems,
core: CoreIndex,
) -> anyhow::Result<WorkPackageHash> {
let package = build_work_package(self.service_id, context, items);
let encoded = package.encode();
let package_hash = WorkPackageHash(hash_raw(&encoded));
self.node.submit_encoded_work_package(core, encoded.into(), &[]).await?;
Ok(package_hash)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn distribute_gas_works() {
let mut items = [100, 100, 100];
distribute_gas(items.iter_mut(), 100, 3, 0);
assert_eq!([34, 34, 32], items);
let mut items = [50, 100, 100];
distribute_gas(items.iter_mut(), 100, 2, 50);
assert_eq!([50, 25, 25], items);
}
}