use crate::LOG_TARGET;
use codec::Codec;
use cumulus_primitives_aura::Slot;
use cumulus_primitives_core::BlockT;
use sc_client_api::UsageProvider;
use sc_consensus_aura::SlotDuration;
use sp_api::{ApiExt, ProvideRuntimeApi};
use sp_application_crypto::AppPublic;
use sp_consensus_aura::AuraApi;
use sp_core::Pair;
use sp_runtime::traits::Member;
use sp_timestamp::Timestamp;
use std::{
cmp::{max, min},
sync::Arc,
time::Duration,
};
const BLOCK_PRODUCTION_MINIMUM_INTERVAL_MS: Duration = Duration::from_millis(500);
const BLOCK_PRODUCTION_THRESHOLD_MS: Duration = Duration::from_millis(100);
const BLOCK_PRODUCTION_ADJUSTMENT_MS: Duration = Duration::from_millis(1000);
#[derive(Debug)]
pub(crate) struct SlotInfo {
pub timestamp: Timestamp,
pub slot: Slot,
}
#[derive(Debug)]
pub(crate) struct SlotTimer<Block, Client, P> {
client: Arc<Client>,
time_offset: Duration,
last_reported_core_num: Option<u32>,
relay_slot_duration: Duration,
last_reported_slot: Option<Slot>,
_marker: std::marker::PhantomData<(Block, Box<dyn Fn(P) + Send + Sync + 'static>)>,
}
fn compute_next_wake_up_time(
para_slot_duration: SlotDuration,
relay_slot_duration: Duration,
core_count: Option<u32>,
time_now: Duration,
time_offset: Duration,
) -> (Duration, Slot) {
let para_slots_per_relay_block =
(relay_slot_duration.as_millis() / para_slot_duration.as_millis() as u128) as u32;
let assigned_core_num = core_count.unwrap_or(1);
let mut block_production_interval = min(para_slot_duration.as_duration(), relay_slot_duration);
if assigned_core_num > para_slots_per_relay_block &&
para_slot_duration.as_duration() >= relay_slot_duration
{
block_production_interval =
max(relay_slot_duration / assigned_core_num, BLOCK_PRODUCTION_MINIMUM_INTERVAL_MS);
tracing::debug!(
target: LOG_TARGET,
?block_production_interval,
"Expected to produce for {assigned_core_num} cores but only have {para_slots_per_relay_block} slots. Attempting to produce multiple blocks per slot."
);
}
let (duration, timestamp) =
time_until_next_attempt(time_now, block_production_interval, time_offset);
let aura_slot = Slot::from_timestamp(timestamp, para_slot_duration);
(duration, aura_slot)
}
fn compute_time_until_next_slot_change(
para_slot_duration: SlotDuration,
time_now: Duration,
time_offset: Duration,
last_reported_slot: Slot,
) -> Option<(Duration, Slot)> {
let now = time_now.saturating_sub(time_offset);
let next_slot = last_reported_slot + Slot::from(1);
let Some(next_slot_timestamp) = next_slot.timestamp(para_slot_duration) else {
return None;
};
let remaining_time = next_slot_timestamp.as_duration().saturating_sub(now);
Some((remaining_time, next_slot))
}
fn duration_now() -> Duration {
use std::time::SystemTime;
let now = SystemTime::now();
now.duration_since(SystemTime::UNIX_EPOCH).unwrap_or_else(|e| {
panic!("Current time {:?} is before Unix epoch. Something is wrong: {:?}", now, e)
})
}
fn adjust_authoring_duration(
mut authoring_duration: Duration,
next_block: (Duration, Slot),
next_slot_change: (Duration, Slot),
different_authors: bool,
) -> Option<Duration> {
let (duration, next_block_slot) = next_block;
let (duration_until_next_slot, next_slot) = next_slot_change;
let duration_until_deadline =
duration_until_next_slot.saturating_sub(BLOCK_PRODUCTION_ADJUSTMENT_MS);
tracing::debug!(
target: LOG_TARGET,
?authoring_duration,
?duration,
?next_block_slot,
?duration_until_next_slot,
?next_slot,
?duration_until_deadline,
?different_authors,
"Adjusting authoring duration for slot.",
);
if duration_until_deadline == Duration::ZERO {
if different_authors {
tracing::debug!(
target: LOG_TARGET,
?duration_until_next_slot,
?next_slot,
"Not enough time left in the slot to adjust authoring duration. Skipping block production for the slot."
);
return None;
}
return Some(authoring_duration.min(duration));
}
if different_authors && authoring_duration >= duration_until_deadline {
authoring_duration = duration_until_deadline;
if authoring_duration <
BLOCK_PRODUCTION_MINIMUM_INTERVAL_MS.saturating_sub(BLOCK_PRODUCTION_THRESHOLD_MS)
{
tracing::debug!(
target: LOG_TARGET,
?authoring_duration,
?next_slot,
"Authoring duration is below minimum. Skipping block production for the slot."
);
return None;
}
}
Some(authoring_duration.min(duration))
}
fn time_until_next_attempt(
now: Duration,
block_production_interval: Duration,
offset: Duration,
) -> (Duration, Timestamp) {
let now = now.as_millis().saturating_sub(offset.as_millis());
let next_slot_time = ((now + block_production_interval.as_millis()) /
block_production_interval.as_millis()) *
block_production_interval.as_millis();
let remaining_millis = next_slot_time - now;
(Duration::from_millis(remaining_millis as u64), Timestamp::from(next_slot_time as u64))
}
impl<Block, Client, P> SlotTimer<Block, Client, P>
where
Block: BlockT,
Client: ProvideRuntimeApi<Block> + UsageProvider<Block> + Send + Sync + 'static,
Client::Api: AuraApi<Block, P::Public>,
P: Pair,
P::Public: AppPublic + Member + Codec,
P::Signature: TryFrom<Vec<u8>> + Member + Codec,
{
pub fn new_with_offset(
client: Arc<Client>,
time_offset: Duration,
relay_slot_duration: Duration,
) -> Self {
Self {
client,
time_offset,
last_reported_core_num: None,
relay_slot_duration,
last_reported_slot: Default::default(),
_marker: Default::default(),
}
}
pub fn update_scheduling(&mut self, num_cores_next_block: u32) {
self.last_reported_core_num = Some(num_cores_next_block);
}
pub fn time_until_next_block(&mut self, slot_duration: SlotDuration) -> (Duration, Slot) {
compute_next_wake_up_time(
slot_duration,
self.relay_slot_duration,
self.last_reported_core_num,
duration_now(),
self.time_offset,
)
}
fn time_until_next_slot_change(
&mut self,
slot_duration: SlotDuration,
) -> Option<(Duration, Slot)> {
compute_time_until_next_slot_change(
slot_duration,
duration_now(),
self.time_offset,
self.last_reported_slot.unwrap_or_default(),
)
}
fn check_different_slot_authors(&self, slot: Slot, next_slot: Slot) -> bool {
let best_hash = self.client.usage_info().chain.best_hash;
let mut runtime_api = self.client.runtime_api();
runtime_api.set_call_context(sp_core::traits::CallContext::Onchain);
let Ok(authorities) = runtime_api.authorities(best_hash) else {
return true;
};
let authorities_len = authorities.len() as u64;
if authorities_len <= 1 {
return false;
}
let author1_idx = *slot % authorities_len;
let author2_idx = *next_slot % authorities_len;
author1_idx != author2_idx
}
pub fn adjust_authoring_duration(&mut self, authoring_duration: Duration) -> Option<Duration> {
let Ok(slot_duration) = crate::slot_duration(&*self.client) else {
tracing::error!(target: LOG_TARGET, "Failed to fetch slot duration from runtime.");
return None;
};
let next_block = self.time_until_next_block(slot_duration);
let Some(next_slot_change) = self.time_until_next_slot_change(slot_duration) else {
tracing::error!(
target: LOG_TARGET,
"Failed to compute time until next slot change. Using unadjusted authoring duration."
);
return Some(authoring_duration);
};
let current_slot = self.last_reported_slot.unwrap_or(next_block.1);
let different_authors = self.check_different_slot_authors(current_slot, next_slot_change.1);
adjust_authoring_duration(
authoring_duration,
next_block,
next_slot_change,
different_authors,
)
}
pub async fn wait_until_next_slot(&mut self) -> Result<(), ()> {
let slot_duration = match crate::slot_duration(&*self.client) {
Ok(d) => d,
Err(error) => {
tracing::error!(target: LOG_TARGET, %error, "Failed to fetch slot duration from runtime.");
return Err(());
},
};
let (time_until_next_attempt, mut next_aura_slot) =
self.time_until_next_block(slot_duration);
tracing::trace!(
target: LOG_TARGET,
?time_until_next_attempt,
aura_slot = ?next_aura_slot,
last_reported = ?self.last_reported_slot,
"Determined next block production opportunity."
);
match self.last_reported_slot {
Some(ls) if ls + 1 < next_aura_slot && next_aura_slot <= ls + 3 => {
next_aura_slot = ls + 1u64;
},
None | Some(_) => {
tracing::trace!(target: LOG_TARGET, ?time_until_next_attempt, "Sleeping until the next slot.");
tokio::time::sleep(time_until_next_attempt).await;
},
}
tracing::debug!(
target: LOG_TARGET,
?slot_duration,
aura_slot = ?next_aura_slot,
"New block production opportunity."
);
self.last_reported_slot = Some(next_aura_slot);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use rstest::rstest;
use sc_consensus_aura::SlotDuration;
const RELAY_CHAIN_SLOT_DURATION: u64 = 6000;
#[rstest]
#[case(6000, Some(1), 1000, 0, 5000)]
#[case(6000, Some(1), 0, 0, 6000)]
#[case(6000, Some(1), 6000, 0, 6000)]
#[case(6000, Some(0), 6000, 0, 6000)]
#[case(6000, None, 1000, 0, 5000)]
#[case(6000, None, 0, 0, 6000)]
#[case(6000, None, 6000, 0, 6000)]
#[case(6000, Some(1), 1000, 1000, 6000)]
#[case(6000, Some(1), 12000, 2000, 2000)]
#[case(6000, Some(1), 12000, 6000, 6000)]
#[case(6000, Some(1), 12000, 7000, 1000)]
#[case(6000, Some(3), 12000, 0, 2000)]
#[case(6000, Some(2), 12000, 0, 3000)]
#[case(6000, Some(3), 11999, 0, 1)]
#[case(6000, Some(12), 0, 0, 500)]
#[case(6000, Some(100), 0, 0, 500)]
#[case(2000, Some(1), 1000, 0, 1000)]
#[case(2000, Some(1), 3000, 0, 1000)]
#[case(2000, Some(1), 10000, 0, 2000)]
#[case(2000, Some(2), 1000, 0, 1000)]
#[case(2000, Some(3), 3000, 0, 1000)]
#[case(12000, None, 0, 0, 6000)]
#[case(12000, None, 6100, 0, 5900)]
#[case(12000, None, 6000, 2000, 2000)]
#[case(12000, Some(2), 6000, 0, 3000)]
#[case(12000, Some(3), 6000, 0, 2000)]
#[case(12000, Some(3), 8100, 0, 1900)]
fn test_get_next_slot(
#[case] para_slot_millis: u64,
#[case] core_count: Option<u32>,
#[case] time_now: u64,
#[case] offset_millis: u64,
#[case] expected_wait_duration: u128,
) {
let para_slot_duration = SlotDuration::from_millis(para_slot_millis); let relay_slot_duration = Duration::from_millis(RELAY_CHAIN_SLOT_DURATION);
let time_now = Duration::from_millis(time_now); let offset = Duration::from_millis(offset_millis);
let (wait_duration, _) = compute_next_wake_up_time(
para_slot_duration,
relay_slot_duration,
core_count,
time_now,
offset,
);
assert_eq!(wait_duration.as_millis(), expected_wait_duration, "Wait time mismatch.");
}
#[rstest]
#[case(6000, 0, 0, Slot::from(0), 6000, Slot::from(1))]
#[case(6000, 1000, 0, Slot::from(0), 5000, Slot::from(1))]
#[case(6000, 6000, 0, Slot::from(1), 6000, Slot::from(2))]
#[case(6000, 12000, 0, Slot::from(2), 6000, Slot::from(3))]
#[case(6000, 1000, 1000, Slot::from(0), 6000, Slot::from(1))]
#[case(6000, 2000, 1000, Slot::from(0), 5000, Slot::from(1))]
#[case(6000, 6000, 3000, Slot::from(0), 3000, Slot::from(1))]
#[case(3000, 1000, 0, Slot::from(0), 2000, Slot::from(1))]
#[case(3000, 3000, 0, Slot::from(1), 3000, Slot::from(2))]
#[case(12000, 6000, 0, Slot::from(0), 6000, Slot::from(1))]
#[case(12000, 12000, 0, Slot::from(1), 12000, Slot::from(2))]
#[case(6000, 5999, 0, Slot::from(0), 1, Slot::from(1))]
#[case(6000, 11999, 0, Slot::from(1), 1, Slot::from(2))]
fn test_compute_time_until_next_slot_change(
#[case] para_slot_millis: u64,
#[case] time_now: u64,
#[case] offset_millis: u64,
#[case] last_reported_slot: Slot,
#[case] expected_duration: u128,
#[case] expected_next_slot: Slot,
) {
let para_slot_duration = SlotDuration::from_millis(para_slot_millis);
let time_now = Duration::from_millis(time_now);
let offset = Duration::from_millis(offset_millis);
let result = compute_time_until_next_slot_change(
para_slot_duration,
time_now,
offset,
last_reported_slot,
);
assert!(result.is_some(), "Expected result to be Some");
let (duration, next_slot) = result.unwrap();
assert_eq!(duration.as_millis(), expected_duration, "Duration mismatch");
assert_eq!(next_slot, expected_next_slot, "Next slot mismatch");
}
#[rstest]
#[case::blocks_2s_fits_next_block(
Duration::from_millis(2000), // Authoring duration
(Duration::from_millis(2000), Slot::from(1)), // Next block
(Duration::from_millis(4000), Slot::from(2)), // Next slot change
true, // Different authors
Some(Duration::from_millis(2000)), // Expected
)]
#[case::blocks_2s_closer_next_slot(
Duration::from_millis(2000), // Authoring duration
(Duration::from_millis(1950), Slot::from(1)), // Next block
(Duration::from_millis(4000), Slot::from(2)), // Next slot change
true, // Different authors
Some(Duration::from_millis(1950)), // Expected
)]
#[case::blocks_2s_closer_next_slot_bigger(
Duration::from_millis(2000), // Authoring duration
(Duration::from_millis(1500), Slot::from(1)), // Next block
(Duration::from_millis(4000), Slot::from(2)), // Next slot change
true, // Different authors
Some(Duration::from_millis(1500)), // Expected
)]
#[case::blocks_2s_reduce_by_1s(
Duration::from_millis(2000), // Authoring duration
(Duration::from_millis(2000), Slot::from(1)), // Next block
(Duration::from_millis(2000), Slot::from(2)), // Next slot change
true, // Different authors
Some(Duration::from_millis(1000)), // Expected
)]
#[case::blocks_2s_reduce_by_1s_plus_offset(
Duration::from_millis(2000), // Authoring duration
(Duration::from_millis(1950), Slot::from(1)), // Next block
(Duration::from_millis(1950), Slot::from(2)), // Next slot change
true, // Different authors
Some(Duration::from_millis(950)), // Expected
)]
#[case::blocks_2s_reduce_to_minimum(
Duration::from_millis(2000), // Authoring duration
(Duration::from_millis(1400), Slot::from(1)), // Next block
(Duration::from_millis(1400), Slot::from(2)), // Next slot change
true, // Different authors
Some(Duration::from_millis(400)), // Expected
)]
#[case::blocks_2s_reduce_below_minimum(
Duration::from_millis(2000), // Authoring duration
(Duration::from_millis(1300), Slot::from(1)), // Next block
(Duration::from_millis(1300), Slot::from(2)), // Next slot change
true, // Different authors
None, // Expected to reduce below minimum
)]
#[case::blocks_2s_same_author(
Duration::from_millis(2000), // Authoring duration
(Duration::from_millis(1400), Slot::from(1)), // Next block
(Duration::from_millis(1400), Slot::from(2)), // Next slot change
false, // Different authors
Some(Duration::from_millis(1400)), // Expected no adjustment for last second.
)]
#[case::blocks_500ms_fits_next_block(
Duration::from_millis(500), // Authoring duration
(Duration::from_millis(500), Slot::from(1)), // Next block
(Duration::from_millis(2000), Slot::from(2)), // Next slot change
true, // Different authors
Some(Duration::from_millis(500)), // Expected
)]
#[case::blocks_500ms_closer_next_slot(
Duration::from_millis(500), // Authoring duration
(Duration::from_millis(450), Slot::from(1)), // Next block
(Duration::from_millis(2000), Slot::from(2)), // Next slot change
true, // Different authors
Some(Duration::from_millis(450)), // Expected
)]
#[case::blocks_500ms_closer_next_slot_bigger(
Duration::from_millis(500), // Authoring duration
(Duration::from_millis(400), Slot::from(1)), // Next block
(Duration::from_millis(1500), Slot::from(2)), // Next slot change
true, // Different authors
Some(Duration::from_millis(400)), // Expected
)]
#[case::blocks_500ms_reduce_by_1s(
Duration::from_millis(500), // Authoring duration
(Duration::from_millis(500), Slot::from(1)), // Next block
(Duration::from_millis(1000), Slot::from(2)), // Next slot change
true, // Different authors
None, // Expected
)]
#[case::blocks_500ms_reduce_by_1s_closer(
Duration::from_millis(500), // Authoring duration
(Duration::from_millis(500), Slot::from(1)), // Next block
(Duration::from_millis(500), Slot::from(2)), // Next slot change
true, // Different authors
None, // Expected
)]
#[case::blocks_500ms_same_author(
Duration::from_millis(500), // Authoring duration
(Duration::from_millis(410), Slot::from(1)), // Next block
(Duration::from_millis(1000), Slot::from(2)), // Next slot change
false, // Different authors
Some(Duration::from_millis(410)), // Expected no adjustment for last second.
)]
#[case::blocks_500ms_same_author_closer(
Duration::from_millis(500), // Authoring duration
(Duration::from_millis(400), Slot::from(1)), // Next block
(Duration::from_millis(400), Slot::from(2)), // Next slot change
false, // Different authors
Some(Duration::from_millis(400)), // Expected no adjustment for last second.
)]
fn test_adjust_authoring_duration(
#[case] authoring_duration: Duration,
#[case] next_block: (Duration, Slot),
#[case] next_slot_change: (Duration, Slot),
#[case] different_authors: bool,
#[case] expected: Option<Duration>,
) {
sp_tracing::init_for_tests();
let result = adjust_authoring_duration(
authoring_duration,
next_block,
next_slot_change,
different_authors,
);
tracing::debug!("Adjusted authoring duration: {:?}", result);
assert_eq!(result, expected);
}
}