use std::sync::Arc;
use derive_builder::Builder;
use crate::blocks::BlockMetadata;
use crate::manager::BlockManager;
use super::request::RequestSequence;
use dynamo_tokens::Token;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SequenceState {
Idle,
PrefillScheduled {
num_tokens: usize,
blocks_allocated: usize,
},
DecodeScheduled {
blocks_allocated: usize,
},
SpeculativeScheduled {
num_tokens: usize,
blocks_allocated: usize,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DecodeOutcome {
Continue,
BlockCompleted,
MaxLength,
BlockCompletedAndMaxLength,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SequenceEvent {
Created {
num_input_tokens: usize,
max_output_tokens: usize,
block_size: usize,
},
PrefixMatched {
blocks_matched: usize,
},
PrefillScheduled {
num_tokens: usize,
blocks_allocated: usize,
},
PrefillApplied {
num_tokens: usize,
blocks_registered: usize,
token_emitted: bool,
},
DecodeScheduled {
blocks_allocated: usize,
},
DecodeApplied {
token: Token,
block_completed: bool,
},
SpeculativeScheduled {
num_tokens: usize,
blocks_allocated: usize,
},
SpeculativeApplied {
accepted: usize,
scheduled: usize,
blocks_released: usize,
},
ScheduleReverted {
blocks_released: usize,
},
UnassignedDropped {
count: usize,
},
Released,
Reacquired {
prefix_matched: usize,
success: bool,
},
}
pub trait SequenceDelegate: Send + Sync {
fn on_event(&self, event: &SequenceEvent);
}
pub struct NoopDelegate;
impl SequenceDelegate for NoopDelegate {
fn on_event(&self, _event: &SequenceEvent) {}
}
#[doc(hidden)]
#[derive(Builder)]
#[builder(
name = "SchedulableSequenceBuilder",
pattern = "owned",
build_fn(private, name = "build_params", error = "anyhow::Error")
)]
pub struct SchedulableSequenceParams {
tokens: Vec<Token>,
max_output_tokens: usize,
block_size: u32,
#[builder(default, setter(custom))]
delegate: Option<Arc<dyn SequenceDelegate>>,
}
impl SchedulableSequenceBuilder {
pub fn delegate(mut self, delegate: Arc<dyn SequenceDelegate>) -> Self {
self.delegate = Some(Some(delegate));
self
}
pub fn build<T: BlockMetadata>(self) -> anyhow::Result<SchedulableSequence<T>> {
let params = self.build_params()?;
Ok(SchedulableSequence::new(
params.tokens,
params.max_output_tokens,
params.block_size,
params.delegate,
))
}
}
#[derive(Debug, thiserror::Error)]
pub enum ScheduleError {
#[error("expected Idle state, got {state:?}")]
NotIdle { state: SequenceState },
#[error("prefill overrun: position {position} + {num_tokens} > {num_input_tokens}")]
PrefillOverrun {
position: usize,
num_tokens: usize,
num_input_tokens: usize,
},
#[error("prefill already complete")]
PrefillComplete,
#[error("prefill not yet complete (position {position} < {num_input_tokens})")]
PrefillNotComplete {
position: usize,
num_input_tokens: usize,
},
#[error("allocation failed: needed {needed} blocks")]
AllocationFailed { needed: usize },
#[error("generation complete: {generated} >= {max_output}")]
GenerationComplete { generated: usize, max_output: usize },
#[error("expected {expected} dangling token(s), got {actual}")]
WrongDanglingCount { expected: usize, actual: usize },
}
#[derive(Debug, thiserror::Error)]
pub enum ApplyError {
#[error("expected {expected}, got {actual:?}")]
WrongState {
expected: &'static str,
actual: SequenceState,
},
#[error("token provided but prefill not completing this chunk")]
TokenOnNonFinalChunk,
#[error("accepted {accepted} tokens exceeds scheduled {scheduled}")]
AcceptedExceedsScheduled { accepted: usize, scheduled: usize },
#[error("final prefill chunk requires a generated token")]
MissingTokenOnFinalChunk,
#[error("append requested {requested} tokens but only {remaining} remain")]
AppendExceedsRemaining { requested: usize, remaining: usize },
}
macro_rules! delegate_to_inner {
( $( $(#[$meta:meta])* $vis:vis fn $name:ident(&self) -> $ret:ty; )* ) => {
$( $(#[$meta])* $vis fn $name(&self) -> $ret { self.inner.$name() } )*
};
}
pub struct SchedulableSequence<T: BlockMetadata> {
inner: RequestSequence<T>,
state: SequenceState,
prefill_position: usize,
kv_position: usize,
delegate: Arc<dyn SequenceDelegate>,
}
impl<T: BlockMetadata> SchedulableSequence<T> {
pub fn builder() -> SchedulableSequenceBuilder {
SchedulableSequenceBuilder::default()
}
pub fn new(
tokens: Vec<Token>,
max_output_tokens: usize,
block_size: u32,
delegate: Option<Arc<dyn SequenceDelegate>>,
) -> Self {
let inner = RequestSequence::new(tokens, max_output_tokens, block_size);
let delegate = delegate.unwrap_or_else(|| Arc::new(NoopDelegate));
delegate.on_event(&SequenceEvent::Created {
num_input_tokens: inner.num_input_tokens(),
max_output_tokens,
block_size: block_size as usize,
});
Self {
inner,
state: SequenceState::Idle,
prefill_position: 0,
kv_position: 0,
delegate,
}
}
pub fn match_and_add_prefix(
&mut self,
manager: &BlockManager<T>,
) -> Result<usize, ScheduleError> {
self.require_idle()?;
let count = self
.inner
.match_and_add_prefix(manager)
.unwrap_or_else(|_| panic!("prefix match should not produce duplicates"));
if count > 0 {
self.prefill_position += count * self.inner.block_size();
self.kv_position = self.prefill_position;
}
self.delegate.on_event(&SequenceEvent::PrefixMatched {
blocks_matched: count,
});
Ok(count)
}
pub fn schedule_prefill(
&mut self,
num_tokens: usize,
manager: &BlockManager<T>,
) -> Result<(), ScheduleError> {
self.require_idle()?;
if self.is_prefill_complete() {
return Err(ScheduleError::PrefillComplete);
}
let num_input = self.inner.num_input_tokens();
let new_position = self.prefill_position + num_tokens;
if new_position > num_input {
return Err(ScheduleError::PrefillOverrun {
position: self.prefill_position,
num_tokens,
num_input_tokens: num_input,
});
}
let bs = self.inner.block_size();
let total_blocks_needed = new_position.div_ceil(bs);
let already_have = self.inner.assigned_blocks()
+ self.inner.staged_blocks()
+ self.inner.unassigned_blocks();
let to_allocate = total_blocks_needed.saturating_sub(already_have);
if to_allocate > 0 && !self.inner.allocate_blocks(to_allocate, manager) {
return Err(ScheduleError::AllocationFailed {
needed: to_allocate,
});
}
self.state = SequenceState::PrefillScheduled {
num_tokens,
blocks_allocated: to_allocate,
};
self.delegate.on_event(&SequenceEvent::PrefillScheduled {
num_tokens,
blocks_allocated: to_allocate,
});
Ok(())
}
pub fn apply_prefill(
&mut self,
token: Option<Token>,
manager: &BlockManager<T>,
) -> Result<(), ApplyError> {
let (num_tokens, _blocks_allocated) = match self.state {
SequenceState::PrefillScheduled {
num_tokens,
blocks_allocated,
} => (num_tokens, blocks_allocated),
other => {
return Err(ApplyError::WrongState {
expected: "PrefillScheduled",
actual: other,
});
}
};
let new_position = self.prefill_position + num_tokens;
let is_final = new_position >= self.inner.num_input_tokens();
if token.is_some() && !is_final {
return Err(ApplyError::TokenOnNonFinalChunk);
}
if is_final && token.is_none() && self.inner.max_output_tokens() > 0 {
return Err(ApplyError::MissingTokenOnFinalChunk);
}
let blocks_registered_before = self.inner.assigned_blocks();
self.inner.complete_and_register_pending(manager);
self.prefill_position = new_position;
self.kv_position = self.prefill_position;
let token_emitted = token.is_some();
if let Some(tok) = token {
self.inner.append_token(tok);
}
let blocks_registered =
self.inner.assigned_blocks() - blocks_registered_before + self.inner.staged_blocks();
self.state = SequenceState::Idle;
self.delegate.on_event(&SequenceEvent::PrefillApplied {
num_tokens,
blocks_registered,
token_emitted,
});
Ok(())
}
pub fn schedule_decode(&mut self, manager: &BlockManager<T>) -> Result<(), ScheduleError> {
self.require_idle()?;
self.require_prefill_complete()?;
self.require_not_complete()?;
self.require_one_dangling()?;
let complete_in_seq = self.inner.complete_sequence_blocks();
let registered = self.inner.assigned_blocks() + self.inner.staged_blocks();
let pending = complete_in_seq.saturating_sub(registered);
let need = pending + 1;
let have = self.inner.unassigned_blocks();
let to_allocate = need.saturating_sub(have);
if to_allocate > 0 && !self.inner.allocate_blocks(to_allocate, manager) {
return Err(ScheduleError::AllocationFailed {
needed: to_allocate,
});
}
self.state = SequenceState::DecodeScheduled {
blocks_allocated: to_allocate,
};
self.delegate.on_event(&SequenceEvent::DecodeScheduled {
blocks_allocated: to_allocate,
});
Ok(())
}
pub fn apply_decode(
&mut self,
token: Token,
manager: &BlockManager<T>,
) -> Result<DecodeOutcome, ApplyError> {
let _blocks_allocated = match self.state {
SequenceState::DecodeScheduled { blocks_allocated } => blocks_allocated,
other => {
return Err(ApplyError::WrongState {
expected: "DecodeScheduled",
actual: other,
});
}
};
let crossed = self.inner.append_token(token);
let block_completed = crossed.is_some();
self.inner.complete_and_register_pending(manager);
self.kv_position += 1;
self.state = SequenceState::Idle;
self.delegate.on_event(&SequenceEvent::DecodeApplied {
token,
block_completed,
});
let is_complete = self.inner.is_complete();
Ok(match (block_completed, is_complete) {
(false, false) => DecodeOutcome::Continue,
(true, false) => DecodeOutcome::BlockCompleted,
(false, true) => DecodeOutcome::MaxLength,
(true, true) => DecodeOutcome::BlockCompletedAndMaxLength,
})
}
pub fn schedule_speculative(
&mut self,
num_draft_tokens: usize,
manager: &BlockManager<T>,
) -> Result<(), ScheduleError> {
self.require_idle()?;
self.require_prefill_complete()?;
self.require_not_complete()?;
self.require_one_dangling()?;
let num_draft_tokens = num_draft_tokens.min(self.inner.remaining_tokens());
let bs = self.inner.block_size();
let future_total = self.inner.total_tokens() + num_draft_tokens;
let future_blocks = future_total.div_ceil(bs);
let have = self.inner.assigned_blocks()
+ self.inner.staged_blocks()
+ self.inner.unassigned_blocks();
let to_allocate = future_blocks.saturating_sub(have);
if to_allocate > 0 && !self.inner.allocate_blocks(to_allocate, manager) {
return Err(ScheduleError::AllocationFailed {
needed: to_allocate,
});
}
self.state = SequenceState::SpeculativeScheduled {
num_tokens: num_draft_tokens,
blocks_allocated: to_allocate,
};
self.delegate
.on_event(&SequenceEvent::SpeculativeScheduled {
num_tokens: num_draft_tokens,
blocks_allocated: to_allocate,
});
Ok(())
}
pub fn apply_speculative(
&mut self,
accepted: &[Token],
manager: &BlockManager<T>,
) -> Result<DecodeOutcome, ApplyError> {
let (scheduled_tokens, _blocks_allocated) = match self.state {
SequenceState::SpeculativeScheduled {
num_tokens,
blocks_allocated,
} => (num_tokens, blocks_allocated),
other => {
return Err(ApplyError::WrongState {
expected: "SpeculativeScheduled",
actual: other,
});
}
};
if accepted.len() > scheduled_tokens {
return Err(ApplyError::AcceptedExceedsScheduled {
accepted: accepted.len(),
scheduled: scheduled_tokens,
});
}
let mut block_completed = false;
for &token in accepted {
let crossed = self.inner.append_token(token);
if crossed.is_some() {
block_completed = true;
}
}
self.inner.complete_and_register_pending(manager);
self.kv_position += accepted.len();
let excess = self.lifo_drop_excess_unassigned();
self.state = SequenceState::Idle;
self.delegate.on_event(&SequenceEvent::SpeculativeApplied {
accepted: accepted.len(),
scheduled: scheduled_tokens,
blocks_released: excess,
});
let is_complete = self.inner.is_complete();
Ok(match (block_completed, is_complete) {
(false, false) => DecodeOutcome::Continue,
(true, false) => DecodeOutcome::BlockCompleted,
(false, true) => DecodeOutcome::MaxLength,
(true, true) => DecodeOutcome::BlockCompletedAndMaxLength,
})
}
pub fn revert_schedule(&mut self) -> Result<(), ApplyError> {
let blocks_to_release = match self.state {
SequenceState::PrefillScheduled {
blocks_allocated, ..
} => blocks_allocated,
SequenceState::DecodeScheduled { blocks_allocated } => blocks_allocated,
SequenceState::SpeculativeScheduled {
blocks_allocated, ..
} => blocks_allocated,
other => {
return Err(ApplyError::WrongState {
expected: "any Scheduled state",
actual: other,
});
}
};
self.lifo_pop_unassigned(blocks_to_release);
self.state = SequenceState::Idle;
self.delegate.on_event(&SequenceEvent::ScheduleReverted {
blocks_released: blocks_to_release,
});
Ok(())
}
pub fn drop_unassigned(&mut self, count: usize) -> usize {
assert!(
self.state == SequenceState::Idle,
"drop_unassigned called in non-Idle state: {:?}",
self.state
);
let dropped = self.lifo_pop_unassigned(count);
if dropped > 0 {
self.delegate
.on_event(&SequenceEvent::UnassignedDropped { count: dropped });
}
dropped
}
pub fn release(&mut self) -> Result<(), ApplyError> {
self.require_idle_for_apply()?;
self.inner.release();
self.delegate.on_event(&SequenceEvent::Released);
Ok(())
}
pub fn reacquire(&mut self, manager: &BlockManager<T>) -> Result<bool, ApplyError> {
self.require_idle_for_apply()?;
let success = self.inner.reacquire(manager);
let prefix_matched = self.inner.prefix_matched_blocks();
self.delegate.on_event(&SequenceEvent::Reacquired {
prefix_matched,
success,
});
Ok(success)
}
pub fn append_tokens(&mut self, tokens: &[Token]) -> Result<(), ApplyError> {
self.require_idle_for_apply()?;
let remaining = self.inner.remaining_tokens();
if tokens.len() > remaining {
return Err(ApplyError::AppendExceedsRemaining {
requested: tokens.len(),
remaining,
});
}
for &token in tokens {
self.inner.append_token(token);
}
Ok(())
}
pub fn state(&self) -> SequenceState {
self.state
}
pub fn prefill_position(&self) -> usize {
self.prefill_position
}
pub fn is_prefill_complete(&self) -> bool {
self.prefill_position >= self.inner.num_input_tokens()
}
pub fn kv_position(&self) -> usize {
self.kv_position
}
pub fn tail_tokens(&self) -> usize {
self.inner.total_tokens().saturating_sub(self.kv_position)
}
pub fn delegate(&self) -> &Arc<dyn SequenceDelegate> {
&self.delegate
}
delegate_to_inner! {
pub fn generated_tokens(&self) -> usize;
pub fn max_output_tokens(&self) -> usize;
pub fn num_input_tokens(&self) -> usize;
pub fn total_tokens(&self) -> usize;
pub fn remaining_tokens(&self) -> usize;
pub fn num_blocks(&self) -> usize;
pub fn assigned_blocks(&self) -> usize;
pub fn staged_blocks(&self) -> usize;
pub fn unassigned_blocks(&self) -> usize;
pub fn prefix_matched_blocks(&self) -> usize;
pub fn block_size(&self) -> usize;
pub fn is_complete(&self) -> bool;
}
pub fn inner(&self) -> &RequestSequence<T> {
&self.inner
}
#[allow(dead_code)]
pub(crate) fn inner_mut(&mut self) -> &mut RequestSequence<T> {
&mut self.inner
}
fn require_idle(&self) -> Result<(), ScheduleError> {
if self.state != SequenceState::Idle {
return Err(ScheduleError::NotIdle { state: self.state });
}
Ok(())
}
fn require_idle_for_apply(&self) -> Result<(), ApplyError> {
if self.state != SequenceState::Idle {
return Err(ApplyError::WrongState {
expected: "Idle",
actual: self.state,
});
}
Ok(())
}
fn require_prefill_complete(&self) -> Result<(), ScheduleError> {
if !self.is_prefill_complete() {
return Err(ScheduleError::PrefillNotComplete {
position: self.prefill_position,
num_input_tokens: self.inner.num_input_tokens(),
});
}
Ok(())
}
fn require_not_complete(&self) -> Result<(), ScheduleError> {
if self.inner.is_complete() {
return Err(ScheduleError::GenerationComplete {
generated: self.inner.generated_tokens(),
max_output: self.inner.max_output_tokens(),
});
}
Ok(())
}
fn require_one_dangling(&self) -> Result<(), ScheduleError> {
let dangling = self.tail_tokens();
if dangling != 1 {
return Err(ScheduleError::WrongDanglingCount {
expected: 1,
actual: dangling,
});
}
Ok(())
}
fn lifo_pop_unassigned(&mut self, count: usize) -> usize {
let assignments = self.inner.assignments_mut();
let mut dropped = 0;
for _ in 0..count {
if assignments.pop_last_unassigned().is_some() {
dropped += 1;
} else {
break;
}
}
dropped
}
fn lifo_drop_excess_unassigned(&mut self) -> usize {
let bs = self.inner.block_size();
let total = self.inner.total_tokens();
let need_gen = if self.inner.is_complete() {
0
} else if !total.is_multiple_of(bs) {
1
} else {
1
};
let current = self.inner.unassigned_blocks();
let excess = current.saturating_sub(need_gen);
self.lifo_pop_unassigned(excess)
}
}
impl<T: BlockMetadata> std::fmt::Debug for SchedulableSequence<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SchedulableSequence")
.field("state", &self.state)
.field("prefill_position", &self.prefill_position)
.field("kv_position", &self.kv_position)
.field("inner", &self.inner)
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::testing::{TestMeta, create_test_manager};
use std::sync::Mutex;
const BLOCK_SIZE: u32 = 4;
struct CollectingDelegate {
events: Mutex<Vec<SequenceEvent>>,
}
impl CollectingDelegate {
fn new() -> Self {
Self {
events: Mutex::new(Vec::new()),
}
}
fn events(&self) -> Vec<SequenceEvent> {
self.events.lock().unwrap().clone()
}
}
impl SequenceDelegate for CollectingDelegate {
fn on_event(&self, event: &SequenceEvent) {
self.events.lock().unwrap().push(event.clone());
}
}
fn noop_delegate() -> Option<Arc<dyn SequenceDelegate>> {
None
}
fn make_tokens(n: usize) -> Vec<Token> {
(0..n as u32).collect()
}
#[test]
fn test_new_starts_idle() {
let delegate = Arc::new(CollectingDelegate::new());
let seq = SchedulableSequence::<TestMeta>::new(
make_tokens(8),
10,
BLOCK_SIZE,
Some(delegate.clone()),
);
assert_eq!(seq.state(), SequenceState::Idle);
assert_eq!(seq.prefill_position(), 0);
assert_eq!(seq.kv_position(), 0);
assert_eq!(seq.tail_tokens(), 8);
assert_eq!(seq.num_input_tokens(), 8);
assert_eq!(seq.max_output_tokens(), 10);
assert_eq!(seq.block_size(), BLOCK_SIZE as usize);
assert!(!seq.is_prefill_complete());
let events = delegate.events();
assert_eq!(events.len(), 1);
assert_eq!(
events[0],
SequenceEvent::Created {
num_input_tokens: 8,
max_output_tokens: 10,
block_size: BLOCK_SIZE as usize,
}
);
}
#[test]
fn test_schedule_prefill_requires_idle() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq =
SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
seq.schedule_prefill(4, &manager).unwrap();
let err = seq.schedule_prefill(4, &manager).unwrap_err();
assert!(matches!(err, ScheduleError::NotIdle { .. }));
}
#[test]
fn test_schedule_decode_requires_idle() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq =
SchedulableSequence::<TestMeta>::new(make_tokens(4), 10, BLOCK_SIZE, noop_delegate());
seq.schedule_prefill(4, &manager).unwrap();
seq.apply_prefill(Some(1000), &manager).unwrap();
seq.schedule_decode(&manager).unwrap();
let err = seq.schedule_decode(&manager).unwrap_err();
assert!(matches!(err, ScheduleError::NotIdle { .. }));
}
#[test]
fn test_apply_prefill_requires_scheduled() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq =
SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
let err = seq.apply_prefill(None, &manager).unwrap_err();
assert!(matches!(err, ApplyError::WrongState { .. }));
}
#[test]
fn test_apply_decode_requires_scheduled() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq =
SchedulableSequence::<TestMeta>::new(make_tokens(4), 10, BLOCK_SIZE, noop_delegate());
let err = seq.apply_decode(100, &manager).unwrap_err();
assert!(matches!(err, ApplyError::WrongState { .. }));
}
#[test]
fn test_decode_requires_prefill_complete() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq =
SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
let err = seq.schedule_decode(&manager).unwrap_err();
assert!(matches!(err, ScheduleError::PrefillNotComplete { .. }));
}
#[test]
fn test_speculative_requires_prefill_complete() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq =
SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
let err = seq.schedule_speculative(3, &manager).unwrap_err();
assert!(matches!(err, ScheduleError::PrefillNotComplete { .. }));
}
#[test]
fn test_prefill_single_chunk() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq =
SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
seq.schedule_prefill(8, &manager).unwrap();
assert_eq!(
seq.state(),
SequenceState::PrefillScheduled {
num_tokens: 8,
blocks_allocated: 2, }
);
seq.apply_prefill(Some(1000), &manager).unwrap();
assert_eq!(seq.state(), SequenceState::Idle);
assert_eq!(seq.prefill_position(), 8);
assert_eq!(seq.kv_position(), 8);
assert!(seq.is_prefill_complete());
assert_eq!(seq.assigned_blocks(), 2);
assert_eq!(seq.unassigned_blocks(), 0); assert_eq!(seq.tail_tokens(), 1); }
#[test]
fn test_prefill_chunked() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq =
SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
seq.schedule_prefill(4, &manager).unwrap();
seq.apply_prefill(None, &manager).unwrap();
assert_eq!(seq.prefill_position(), 4);
assert_eq!(seq.kv_position(), 4);
assert!(!seq.is_prefill_complete());
assert_eq!(seq.assigned_blocks(), 1);
seq.schedule_prefill(4, &manager).unwrap();
seq.apply_prefill(Some(1000), &manager).unwrap();
assert_eq!(seq.prefill_position(), 8);
assert_eq!(seq.kv_position(), 8);
assert!(seq.is_prefill_complete());
assert_eq!(seq.assigned_blocks(), 2);
assert_eq!(seq.unassigned_blocks(), 0); assert_eq!(seq.tail_tokens(), 1);
}
#[test]
fn test_prefill_final_with_first_token() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq =
SchedulableSequence::<TestMeta>::new(make_tokens(4), 10, BLOCK_SIZE, noop_delegate());
seq.schedule_prefill(4, &manager).unwrap();
seq.apply_prefill(Some(100), &manager).unwrap();
assert!(seq.is_prefill_complete());
assert_eq!(seq.generated_tokens(), 1);
assert_eq!(seq.total_tokens(), 5);
assert_eq!(seq.kv_position(), 4);
assert_eq!(seq.tail_tokens(), 1);
}
#[test]
fn test_prefill_token_on_non_final_rejected() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq =
SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
seq.schedule_prefill(4, &manager).unwrap();
let err = seq.apply_prefill(Some(100), &manager).unwrap_err();
assert!(matches!(err, ApplyError::TokenOnNonFinalChunk));
}
#[test]
fn test_prefill_overrun_rejected() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq =
SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
let err = seq.schedule_prefill(9, &manager).unwrap_err();
assert!(matches!(err, ScheduleError::PrefillOverrun { .. }));
}
#[test]
fn test_prefill_allocation_failure() {
let manager = create_test_manager::<TestMeta>(1); let mut seq =
SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
let err = seq.schedule_prefill(8, &manager).unwrap_err();
assert!(matches!(err, ScheduleError::AllocationFailed { .. }));
assert_eq!(seq.state(), SequenceState::Idle);
}
#[test]
fn test_schedule_prefill_after_complete_rejected() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq =
SchedulableSequence::<TestMeta>::new(make_tokens(4), 10, BLOCK_SIZE, noop_delegate());
seq.schedule_prefill(4, &manager).unwrap();
seq.apply_prefill(Some(1000), &manager).unwrap();
let err = seq.schedule_prefill(1, &manager).unwrap_err();
assert!(matches!(err, ScheduleError::PrefillComplete));
}
#[test]
fn test_apply_prefill_none_on_final_rejected() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq =
SchedulableSequence::<TestMeta>::new(make_tokens(4), 10, BLOCK_SIZE, noop_delegate());
seq.schedule_prefill(4, &manager).unwrap();
let err = seq.apply_prefill(None, &manager).unwrap_err();
assert!(matches!(err, ApplyError::MissingTokenOnFinalChunk));
}
fn prefilled_seq(
num_input: usize,
max_output: usize,
manager: &BlockManager<TestMeta>,
) -> SchedulableSequence<TestMeta> {
let mut seq = SchedulableSequence::new(
make_tokens(num_input),
max_output,
BLOCK_SIZE,
noop_delegate(),
);
if num_input > 0 {
seq.schedule_prefill(num_input, manager).unwrap();
seq.apply_prefill(Some(1000), manager).unwrap();
}
seq
}
#[test]
fn test_decode_continue() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq = prefilled_seq(5, 10, &manager);
seq.schedule_decode(&manager).unwrap();
let outcome = seq.apply_decode(100, &manager).unwrap();
assert_eq!(outcome, DecodeOutcome::Continue);
assert_eq!(seq.generated_tokens(), 2); assert_eq!(seq.state(), SequenceState::Idle);
}
#[test]
fn test_decode_block_completed() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq = prefilled_seq(4, 10, &manager);
for _ in 0..2 {
seq.schedule_decode(&manager).unwrap();
let outcome = seq.apply_decode(100, &manager).unwrap();
assert_eq!(outcome, DecodeOutcome::Continue);
}
seq.schedule_decode(&manager).unwrap();
let outcome = seq.apply_decode(100, &manager).unwrap();
assert_eq!(outcome, DecodeOutcome::BlockCompleted);
assert_eq!(seq.assigned_blocks(), 2);
}
#[test]
fn test_decode_max_length() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq = prefilled_seq(5, 2, &manager);
seq.schedule_decode(&manager).unwrap();
let outcome = seq.apply_decode(100, &manager).unwrap();
assert_eq!(outcome, DecodeOutcome::MaxLength);
assert!(seq.is_complete());
}
#[test]
fn test_decode_block_and_max() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq = prefilled_seq(4, 4, &manager);
for _ in 0..2 {
seq.schedule_decode(&manager).unwrap();
seq.apply_decode(100, &manager).unwrap();
}
seq.schedule_decode(&manager).unwrap();
let outcome = seq.apply_decode(100, &manager).unwrap();
assert_eq!(outcome, DecodeOutcome::BlockCompletedAndMaxLength);
}
#[test]
fn test_decode_allocates_gen_block() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq = prefilled_seq(4, 10, &manager);
seq.schedule_decode(&manager).unwrap();
assert_eq!(
seq.state(),
SequenceState::DecodeScheduled {
blocks_allocated: 1
}
);
assert_eq!(seq.unassigned_blocks(), 1);
seq.apply_decode(100, &manager).unwrap(); seq.schedule_decode(&manager).unwrap();
seq.apply_decode(101, &manager).unwrap(); seq.schedule_decode(&manager).unwrap();
let outcome = seq.apply_decode(102, &manager).unwrap(); assert_eq!(outcome, DecodeOutcome::BlockCompleted);
assert_eq!(seq.unassigned_blocks(), 0);
seq.schedule_decode(&manager).unwrap();
assert_eq!(
seq.state(),
SequenceState::DecodeScheduled {
blocks_allocated: 1
}
);
assert_eq!(seq.unassigned_blocks(), 1);
}
#[test]
fn test_decode_generation_complete_rejected() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq = prefilled_seq(5, 2, &manager);
seq.schedule_decode(&manager).unwrap();
seq.apply_decode(100, &manager).unwrap();
assert!(seq.is_complete());
let err = seq.schedule_decode(&manager).unwrap_err();
assert!(matches!(err, ScheduleError::GenerationComplete { .. }));
}
#[test]
fn test_speculative_basic() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq = prefilled_seq(8, 10, &manager);
seq.schedule_speculative(2, &manager).unwrap();
assert!(matches!(
seq.state(),
SequenceState::SpeculativeScheduled { num_tokens: 2, .. }
));
let outcome = seq.apply_speculative(&[100, 101], &manager).unwrap();
assert_eq!(outcome, DecodeOutcome::Continue);
assert_eq!(seq.generated_tokens(), 3); assert_eq!(seq.state(), SequenceState::Idle);
}
#[test]
fn test_speculative_partial_accept() {
let manager = create_test_manager::<TestMeta>(20);
let delegate = Arc::new(CollectingDelegate::new());
let mut seq =
SchedulableSequence::new(make_tokens(4), 10, BLOCK_SIZE, Some(delegate.clone()));
seq.schedule_prefill(4, &manager).unwrap();
seq.apply_prefill(Some(1000), &manager).unwrap();
let avail_before = manager.available_blocks();
seq.schedule_speculative(4, &manager).unwrap();
let outcome = seq.apply_speculative(&[100, 101], &manager).unwrap();
assert_eq!(outcome, DecodeOutcome::Continue);
assert_eq!(seq.generated_tokens(), 3); assert_eq!(seq.unassigned_blocks(), 1);
let events = delegate.events();
let last = events.last().unwrap();
if let SequenceEvent::SpeculativeApplied {
accepted,
scheduled,
..
} = last
{
assert_eq!(*accepted, 2);
assert_eq!(*scheduled, 4);
} else {
panic!("expected SpeculativeApplied");
}
let _ = avail_before;
}
#[test]
fn test_speculative_single_accept() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq = prefilled_seq(4, 10, &manager);
seq.schedule_speculative(5, &manager).unwrap();
let outcome = seq.apply_speculative(&[100], &manager).unwrap();
assert_eq!(outcome, DecodeOutcome::Continue);
assert_eq!(seq.generated_tokens(), 2); }
#[test]
fn test_speculative_zero_accept() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq = prefilled_seq(4, 10, &manager);
let avail_before = manager.available_blocks();
seq.schedule_speculative(3, &manager).unwrap();
let avail_after_schedule = manager.available_blocks();
let outcome = seq.apply_speculative(&[], &manager).unwrap();
assert_eq!(outcome, DecodeOutcome::Continue);
assert_eq!(seq.generated_tokens(), 1);
assert_eq!(seq.unassigned_blocks(), 1); assert!(manager.available_blocks() >= avail_after_schedule);
let _ = avail_before;
}
#[test]
fn test_speculative_exceeds_scheduled_rejected() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq = prefilled_seq(4, 10, &manager);
seq.schedule_speculative(2, &manager).unwrap();
let err = seq
.apply_speculative(&[100, 101, 102], &manager)
.unwrap_err();
assert!(matches!(
err,
ApplyError::AcceptedExceedsScheduled {
accepted: 3,
scheduled: 2,
}
));
}
#[test]
fn test_speculative_block_boundaries() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq = prefilled_seq(7, 20, &manager);
seq.schedule_speculative(5, &manager).unwrap();
let outcome = seq
.apply_speculative(&[100, 101, 102, 103, 104], &manager)
.unwrap();
assert_eq!(outcome, DecodeOutcome::BlockCompleted);
assert_eq!(seq.generated_tokens(), 6); assert_eq!(seq.assigned_blocks(), 3); }
#[test]
fn test_revert_prefill() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq =
SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
let avail_before = manager.available_blocks();
seq.schedule_prefill(4, &manager).unwrap();
assert!(manager.available_blocks() < avail_before);
seq.revert_schedule().unwrap();
assert_eq!(seq.state(), SequenceState::Idle);
assert_eq!(manager.available_blocks(), avail_before);
}
#[test]
fn test_revert_decode() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq = prefilled_seq(4, 10, &manager);
for _ in 0..3 {
seq.schedule_decode(&manager).unwrap();
seq.apply_decode(100, &manager).unwrap();
}
assert_eq!(seq.unassigned_blocks(), 0);
let avail_before = manager.available_blocks();
seq.schedule_decode(&manager).unwrap();
assert_eq!(manager.available_blocks(), avail_before - 1);
seq.revert_schedule().unwrap();
assert_eq!(seq.state(), SequenceState::Idle);
assert_eq!(manager.available_blocks(), avail_before);
}
#[test]
fn test_revert_speculative() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq = prefilled_seq(4, 10, &manager);
let avail_before = manager.available_blocks();
seq.schedule_speculative(4, &manager).unwrap();
let allocated = avail_before - manager.available_blocks();
seq.revert_schedule().unwrap();
assert_eq!(seq.state(), SequenceState::Idle);
assert_eq!(manager.available_blocks(), avail_before);
assert!(allocated > 0 || seq.unassigned_blocks() > 0);
}
#[test]
fn test_revert_returns_blocks_to_manager() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq =
SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
let avail_before = manager.available_blocks();
seq.schedule_prefill(8, &manager).unwrap();
let avail_scheduled = manager.available_blocks();
assert!(avail_scheduled < avail_before);
seq.revert_schedule().unwrap();
assert_eq!(manager.available_blocks(), avail_before);
}
#[test]
fn test_drop_unassigned_lifo() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq = prefilled_seq(4, 10, &manager);
seq.schedule_decode(&manager).unwrap();
seq.apply_decode(100, &manager).unwrap();
assert_eq!(seq.unassigned_blocks(), 1);
let dropped = seq.drop_unassigned(1);
assert_eq!(dropped, 1);
assert_eq!(seq.unassigned_blocks(), 0);
}
#[test]
fn test_drop_unassigned_partial() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq = prefilled_seq(8, 10, &manager);
seq.schedule_decode(&manager).unwrap();
seq.apply_decode(100, &manager).unwrap();
assert_eq!(seq.unassigned_blocks(), 1);
let dropped = seq.drop_unassigned(5);
assert_eq!(dropped, 1);
assert_eq!(seq.unassigned_blocks(), 0);
}
#[test]
fn test_drop_unassigned_zero() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq = prefilled_seq(4, 10, &manager);
seq.schedule_decode(&manager).unwrap();
seq.apply_decode(100, &manager).unwrap();
let dropped = seq.drop_unassigned(0);
assert_eq!(dropped, 0);
assert_eq!(seq.unassigned_blocks(), 1); }
#[test]
fn test_delegate_full_lifecycle() {
let manager = create_test_manager::<TestMeta>(20);
let delegate = Arc::new(CollectingDelegate::new());
let mut seq = SchedulableSequence::<TestMeta>::new(
make_tokens(4),
3,
BLOCK_SIZE,
Some(delegate.clone()),
);
seq.schedule_prefill(4, &manager).unwrap();
seq.apply_prefill(Some(1000), &manager).unwrap();
seq.schedule_decode(&manager).unwrap();
seq.apply_decode(100, &manager).unwrap();
seq.schedule_decode(&manager).unwrap();
seq.apply_decode(101, &manager).unwrap();
seq.release().unwrap();
let h = delegate.events();
assert_eq!(h.len(), 8);
assert!(matches!(h[0], SequenceEvent::Created { .. }));
assert!(matches!(h[1], SequenceEvent::PrefillScheduled { .. }));
assert!(matches!(h[2], SequenceEvent::PrefillApplied { .. }));
assert!(matches!(h[3], SequenceEvent::DecodeScheduled { .. }));
assert!(matches!(h[4], SequenceEvent::DecodeApplied { .. }));
assert!(matches!(h[5], SequenceEvent::DecodeScheduled { .. }));
assert!(matches!(h[6], SequenceEvent::DecodeApplied { .. }));
assert!(matches!(h[7], SequenceEvent::Released));
}
#[test]
fn test_full_lifecycle_prefill_decode_release() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq =
SchedulableSequence::<TestMeta>::new(make_tokens(6), 7, BLOCK_SIZE, noop_delegate());
seq.schedule_prefill(6, &manager).unwrap();
seq.apply_prefill(Some(1000), &manager).unwrap();
assert!(seq.is_prefill_complete());
assert_eq!(seq.assigned_blocks(), 1);
assert_eq!(seq.unassigned_blocks(), 1);
for i in 0..6u32 {
seq.schedule_decode(&manager).unwrap();
let outcome = seq.apply_decode(100 + i, &manager).unwrap();
if i < 5 {
match outcome {
DecodeOutcome::Continue | DecodeOutcome::BlockCompleted => {}
other => panic!("unexpected outcome at token {i}: {other:?}"),
}
} else {
assert!(
outcome == DecodeOutcome::MaxLength
|| outcome == DecodeOutcome::BlockCompletedAndMaxLength,
"last token should hit max length, got: {outcome:?}"
);
}
}
assert!(seq.is_complete());
assert_eq!(seq.generated_tokens(), 7);
assert_eq!(seq.total_tokens(), 13);
seq.release().unwrap();
assert_eq!(seq.assigned_blocks(), 0);
}
#[test]
fn test_preempt_and_reacquire() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq =
SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
seq.schedule_prefill(8, &manager).unwrap();
seq.apply_prefill(Some(1000), &manager).unwrap();
for _ in 0..2 {
seq.schedule_decode(&manager).unwrap();
seq.apply_decode(100, &manager).unwrap();
}
assert_eq!(seq.generated_tokens(), 3);
seq.release().unwrap();
assert_eq!(seq.assigned_blocks(), 0);
let success = seq.reacquire(&manager).unwrap();
assert!(success);
assert_eq!(seq.assigned_blocks(), 2);
assert_eq!(seq.unassigned_blocks(), 0); assert_eq!(seq.generated_tokens(), 3);
seq.schedule_decode(&manager).unwrap();
let outcome = seq.apply_decode(200, &manager).unwrap();
assert_eq!(seq.generated_tokens(), 4);
let _ = outcome;
}
#[test]
fn test_match_and_add_prefix() {
let manager = create_test_manager::<TestMeta>(20);
let tokens = make_tokens(8);
let seq_for_populate = crate::BlockSequence::new(tokens[..4].to_vec(), BLOCK_SIZE, None);
let mutables = manager.allocate_blocks(1).unwrap();
let registered: Vec<_> = mutables
.into_iter()
.zip(seq_for_populate.blocks().iter())
.map(|(m, tb)| manager.register_block(m.complete(tb).unwrap()))
.collect();
drop(registered);
let mut seq = SchedulableSequence::<TestMeta>::new(tokens, 10, BLOCK_SIZE, noop_delegate());
let matched = seq.match_and_add_prefix(&manager).unwrap();
assert_eq!(matched, 1);
assert_eq!(seq.prefill_position(), 4); assert_eq!(seq.kv_position(), 4); assert_eq!(seq.assigned_blocks(), 1);
seq.schedule_prefill(4, &manager).unwrap();
seq.apply_prefill(Some(1000), &manager).unwrap();
assert_eq!(seq.assigned_blocks(), 2);
assert_eq!(seq.unassigned_blocks(), 0); assert_eq!(seq.tail_tokens(), 1);
}
#[test]
fn test_match_and_add_prefix_no_hits() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq =
SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
let matched = seq.match_and_add_prefix(&manager).unwrap();
assert_eq!(matched, 0);
assert_eq!(seq.prefill_position(), 0);
}
#[test]
fn test_empty_tokens_prefill() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq = SchedulableSequence::<TestMeta>::new(vec![], 10, BLOCK_SIZE, noop_delegate());
assert!(seq.is_prefill_complete());
let err = seq.schedule_prefill(0, &manager).unwrap_err();
assert!(matches!(err, ScheduleError::PrefillComplete));
let err = seq.schedule_decode(&manager).unwrap_err();
assert!(matches!(err, ScheduleError::WrongDanglingCount { .. }));
seq.append_tokens(&[100]).unwrap();
assert_eq!(seq.tail_tokens(), 1);
seq.schedule_decode(&manager).unwrap();
let outcome = seq.apply_decode(101, &manager).unwrap();
assert_eq!(outcome, DecodeOutcome::Continue);
}
#[test]
fn test_zero_max_output_no_gen_block() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq =
SchedulableSequence::<TestMeta>::new(make_tokens(4), 0, BLOCK_SIZE, noop_delegate());
seq.schedule_prefill(4, &manager).unwrap();
seq.apply_prefill(None, &manager).unwrap();
assert_eq!(seq.assigned_blocks(), 1);
assert_eq!(seq.unassigned_blocks(), 0);
let err = seq.schedule_decode(&manager).unwrap_err();
assert!(matches!(err, ScheduleError::GenerationComplete { .. }));
}
#[test]
fn test_debug_impl() {
let seq =
SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
let debug_str = format!("{seq:?}");
assert!(debug_str.contains("SchedulableSequence"));
assert!(debug_str.contains("Idle"));
}
#[test]
fn test_revert_idle_rejected() {
let mut seq =
SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
let err = seq.revert_schedule().unwrap_err();
assert!(matches!(err, ApplyError::WrongState { .. }));
}
#[test]
fn test_release_while_scheduled_rejected() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq =
SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
seq.schedule_prefill(4, &manager).unwrap();
let err = seq.release().unwrap_err();
assert!(matches!(err, ApplyError::WrongState { .. }));
}
#[test]
fn test_reacquire_while_scheduled_rejected() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq =
SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
seq.schedule_prefill(4, &manager).unwrap();
let err = seq.reacquire(&manager).unwrap_err();
assert!(matches!(err, ApplyError::WrongState { .. }));
}
#[test]
fn test_dangling_tokens_tracking() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq =
SchedulableSequence::<TestMeta>::new(make_tokens(8), 20, BLOCK_SIZE, noop_delegate());
assert_eq!(seq.kv_position(), 0);
assert_eq!(seq.tail_tokens(), 8);
seq.schedule_prefill(8, &manager).unwrap();
seq.apply_prefill(Some(1000), &manager).unwrap();
assert_eq!(seq.kv_position(), 8);
assert_eq!(seq.tail_tokens(), 1);
seq.schedule_decode(&manager).unwrap();
seq.apply_decode(100, &manager).unwrap();
assert_eq!(seq.kv_position(), 9);
assert_eq!(seq.tail_tokens(), 1);
seq.schedule_speculative(3, &manager).unwrap();
seq.apply_speculative(&[200, 201], &manager).unwrap();
assert_eq!(seq.kv_position(), 11);
assert_eq!(seq.tail_tokens(), 1);
}
#[test]
fn test_decode_requires_one_dangling() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq = SchedulableSequence::<TestMeta>::new(vec![], 10, BLOCK_SIZE, noop_delegate());
let err = seq.schedule_decode(&manager).unwrap_err();
assert!(matches!(
err,
ScheduleError::WrongDanglingCount {
expected: 1,
actual: 0,
}
));
seq.append_tokens(&[100, 101]).unwrap();
assert_eq!(seq.tail_tokens(), 2);
let err = seq.schedule_decode(&manager).unwrap_err();
assert!(matches!(
err,
ScheduleError::WrongDanglingCount {
expected: 1,
actual: 2,
}
));
}
#[test]
fn test_append_tokens_creates_dangling() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq = SchedulableSequence::<TestMeta>::new(vec![], 10, BLOCK_SIZE, noop_delegate());
assert_eq!(seq.tail_tokens(), 0);
seq.append_tokens(&[100]).unwrap();
assert_eq!(seq.tail_tokens(), 1);
assert_eq!(seq.total_tokens(), 1);
assert_eq!(seq.kv_position(), 0);
seq.schedule_decode(&manager).unwrap();
seq.apply_decode(101, &manager).unwrap();
assert_eq!(seq.tail_tokens(), 1);
assert_eq!(seq.kv_position(), 1);
}
#[test]
fn test_append_tokens_exceeding_remaining_returns_error_without_mutation() {
let mut seq = SchedulableSequence::<TestMeta>::new(vec![], 1, BLOCK_SIZE, noop_delegate());
let err = seq.append_tokens(&[100, 101]).unwrap_err();
assert!(matches!(
err,
ApplyError::AppendExceedsRemaining {
requested: 2,
remaining: 1,
}
));
assert_eq!(seq.generated_tokens(), 0);
assert_eq!(seq.remaining_tokens(), 1);
assert_eq!(seq.total_tokens(), 0);
assert_eq!(seq.tail_tokens(), 0);
assert_eq!(seq.kv_position(), 0);
}
#[test]
fn test_kv_position_through_lifecycle() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq =
SchedulableSequence::<TestMeta>::new(make_tokens(8), 20, BLOCK_SIZE, noop_delegate());
assert_eq!(seq.kv_position(), 0);
seq.schedule_prefill(4, &manager).unwrap();
seq.apply_prefill(None, &manager).unwrap();
assert_eq!(seq.kv_position(), 4);
seq.schedule_prefill(4, &manager).unwrap();
seq.apply_prefill(Some(1000), &manager).unwrap();
assert_eq!(seq.kv_position(), 8);
seq.schedule_decode(&manager).unwrap();
seq.apply_decode(100, &manager).unwrap();
assert_eq!(seq.kv_position(), 9);
seq.schedule_speculative(3, &manager).unwrap();
seq.apply_speculative(&[200, 201, 202], &manager).unwrap();
assert_eq!(seq.kv_position(), 12);
}
#[test]
fn test_pending_completion_staged_during_decode() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq =
SchedulableSequence::<TestMeta>::new(make_tokens(7), 10, BLOCK_SIZE, noop_delegate());
seq.schedule_prefill(7, &manager).unwrap();
seq.apply_prefill(Some(1000), &manager).unwrap();
assert_eq!(seq.assigned_blocks(), 1); assert_eq!(seq.unassigned_blocks(), 1); assert_eq!(seq.kv_position(), 7);
assert_eq!(seq.tail_tokens(), 1);
seq.schedule_decode(&manager).unwrap();
assert_eq!(
seq.state(),
SequenceState::DecodeScheduled {
blocks_allocated: 1
}
);
let outcome = seq.apply_decode(100, &manager).unwrap();
assert_eq!(outcome, DecodeOutcome::Continue); assert_eq!(seq.assigned_blocks(), 2); assert_eq!(seq.unassigned_blocks(), 1); assert_eq!(seq.kv_position(), 8);
assert_eq!(seq.tail_tokens(), 1);
}
#[test]
fn test_builder_basic() {
let seq = SchedulableSequence::<TestMeta>::builder()
.tokens(make_tokens(8))
.max_output_tokens(10)
.block_size(BLOCK_SIZE)
.build::<TestMeta>()
.unwrap();
assert_eq!(seq.state(), SequenceState::Idle);
assert_eq!(seq.num_input_tokens(), 8);
assert_eq!(seq.max_output_tokens(), 10);
assert_eq!(seq.block_size(), BLOCK_SIZE as usize);
}
#[test]
fn test_builder_with_delegate() {
let delegate = Arc::new(CollectingDelegate::new());
let seq = SchedulableSequence::<TestMeta>::builder()
.tokens(make_tokens(4))
.max_output_tokens(5)
.block_size(BLOCK_SIZE)
.delegate(delegate.clone())
.build::<TestMeta>()
.unwrap();
assert_eq!(seq.num_input_tokens(), 4);
let events = delegate.events();
assert_eq!(events.len(), 1);
assert!(matches!(events[0], SequenceEvent::Created { .. }));
}
#[test]
fn test_builder_missing_required_field() {
let result = SchedulableSequence::<TestMeta>::builder()
.tokens(make_tokens(4))
.build::<TestMeta>();
assert!(result.is_err());
}
#[test]
fn test_builder_default_noop_delegate() {
let manager = create_test_manager::<TestMeta>(20);
let mut seq = SchedulableSequence::<TestMeta>::builder()
.tokens(make_tokens(4))
.max_output_tokens(10)
.block_size(BLOCK_SIZE)
.build::<TestMeta>()
.unwrap();
seq.schedule_prefill(4, &manager).unwrap();
seq.apply_prefill(Some(1000), &manager).unwrap();
seq.schedule_decode(&manager).unwrap();
seq.apply_decode(100, &manager).unwrap();
seq.release().unwrap();
}
}