1use std::sync::Arc;
150
151use derive_builder::Builder;
152
153use crate::blocks::BlockMetadata;
154use crate::manager::BlockManager;
155
156use super::request::RequestSequence;
157use dynamo_tokens::Token;
158
159#[derive(Debug, Clone, Copy, PartialEq, Eq)]
165pub enum SequenceState {
166 Idle,
167 PrefillScheduled {
168 num_tokens: usize,
169 blocks_allocated: usize,
170 },
171 DecodeScheduled {
172 blocks_allocated: usize,
173 },
174 SpeculativeScheduled {
175 num_tokens: usize,
176 blocks_allocated: usize,
177 },
178}
179
180#[derive(Debug, Clone, Copy, PartialEq, Eq)]
183pub enum DecodeOutcome {
184 Continue,
185 BlockCompleted,
186 MaxLength,
187 BlockCompletedAndMaxLength,
188}
189
190#[derive(Debug, Clone, PartialEq, Eq)]
192pub enum SequenceEvent {
193 Created {
194 num_input_tokens: usize,
195 max_output_tokens: usize,
196 block_size: usize,
197 },
198 PrefixMatched {
199 blocks_matched: usize,
200 },
201 PrefillScheduled {
202 num_tokens: usize,
203 blocks_allocated: usize,
204 },
205 PrefillApplied {
206 num_tokens: usize,
207 blocks_registered: usize,
208 token_emitted: bool,
209 },
210 DecodeScheduled {
211 blocks_allocated: usize,
212 },
213 DecodeApplied {
214 token: Token,
215 block_completed: bool,
216 },
217 SpeculativeScheduled {
218 num_tokens: usize,
219 blocks_allocated: usize,
220 },
221 SpeculativeApplied {
222 accepted: usize,
223 scheduled: usize,
224 blocks_released: usize,
225 },
226 ScheduleReverted {
227 blocks_released: usize,
228 },
229 UnassignedDropped {
230 count: usize,
231 },
232 Released,
233 Reacquired {
234 prefix_matched: usize,
235 success: bool,
236 },
237}
238
239pub trait SequenceDelegate: Send + Sync {
249 fn on_event(&self, event: &SequenceEvent);
250}
251
252pub struct NoopDelegate;
257
258impl SequenceDelegate for NoopDelegate {
259 fn on_event(&self, _event: &SequenceEvent) {}
260}
261
262#[doc(hidden)]
267#[derive(Builder)]
268#[builder(
269 name = "SchedulableSequenceBuilder",
270 pattern = "owned",
271 build_fn(private, name = "build_params", error = "anyhow::Error")
272)]
273pub struct SchedulableSequenceParams {
274 tokens: Vec<Token>,
275 max_output_tokens: usize,
276 block_size: u32,
277 #[builder(default, setter(custom))]
278 delegate: Option<Arc<dyn SequenceDelegate>>,
279}
280
281impl SchedulableSequenceBuilder {
282 pub fn delegate(mut self, delegate: Arc<dyn SequenceDelegate>) -> Self {
283 self.delegate = Some(Some(delegate));
284 self
285 }
286
287 pub fn build<T: BlockMetadata>(self) -> anyhow::Result<SchedulableSequence<T>> {
288 let params = self.build_params()?;
289 Ok(SchedulableSequence::new(
290 params.tokens,
291 params.max_output_tokens,
292 params.block_size,
293 params.delegate,
294 ))
295 }
296}
297
298#[derive(Debug, thiserror::Error)]
304pub enum ScheduleError {
305 #[error("expected Idle state, got {state:?}")]
306 NotIdle { state: SequenceState },
307 #[error("prefill overrun: position {position} + {num_tokens} > {num_input_tokens}")]
308 PrefillOverrun {
309 position: usize,
310 num_tokens: usize,
311 num_input_tokens: usize,
312 },
313 #[error("prefill already complete")]
314 PrefillComplete,
315 #[error("prefill not yet complete (position {position} < {num_input_tokens})")]
316 PrefillNotComplete {
317 position: usize,
318 num_input_tokens: usize,
319 },
320 #[error("allocation failed: needed {needed} blocks")]
321 AllocationFailed { needed: usize },
322 #[error("generation complete: {generated} >= {max_output}")]
323 GenerationComplete { generated: usize, max_output: usize },
324 #[error("expected {expected} dangling token(s), got {actual}")]
325 WrongDanglingCount { expected: usize, actual: usize },
326}
327
328#[derive(Debug, thiserror::Error)]
330pub enum ApplyError {
331 #[error("expected {expected}, got {actual:?}")]
332 WrongState {
333 expected: &'static str,
334 actual: SequenceState,
335 },
336 #[error("token provided but prefill not completing this chunk")]
337 TokenOnNonFinalChunk,
338 #[error("accepted {accepted} tokens exceeds scheduled {scheduled}")]
339 AcceptedExceedsScheduled { accepted: usize, scheduled: usize },
340 #[error("final prefill chunk requires a generated token")]
341 MissingTokenOnFinalChunk,
342 #[error("append requested {requested} tokens but only {remaining} remain")]
343 AppendExceedsRemaining { requested: usize, remaining: usize },
344}
345
346macro_rules! delegate_to_inner {
352 ( $( $(#[$meta:meta])* $vis:vis fn $name:ident(&self) -> $ret:ty; )* ) => {
353 $( $(#[$meta])* $vis fn $name(&self) -> $ret { self.inner.$name() } )*
354 };
355}
356
357pub struct SchedulableSequence<T: BlockMetadata> {
364 inner: RequestSequence<T>,
365 state: SequenceState,
366 prefill_position: usize,
367 kv_position: usize,
368 delegate: Arc<dyn SequenceDelegate>,
369}
370
371impl<T: BlockMetadata> SchedulableSequence<T> {
372 pub fn builder() -> SchedulableSequenceBuilder {
378 SchedulableSequenceBuilder::default()
379 }
380
381 pub fn new(
385 tokens: Vec<Token>,
386 max_output_tokens: usize,
387 block_size: u32,
388 delegate: Option<Arc<dyn SequenceDelegate>>,
389 ) -> Self {
390 let inner = RequestSequence::new(tokens, max_output_tokens, block_size);
391 let delegate = delegate.unwrap_or_else(|| Arc::new(NoopDelegate));
392 delegate.on_event(&SequenceEvent::Created {
393 num_input_tokens: inner.num_input_tokens(),
394 max_output_tokens,
395 block_size: block_size as usize,
396 });
397 Self {
398 inner,
399 state: SequenceState::Idle,
400 prefill_position: 0,
401 kv_position: 0,
402 delegate,
403 }
404 }
405
406 pub fn match_and_add_prefix(
414 &mut self,
415 manager: &BlockManager<T>,
416 ) -> Result<usize, ScheduleError> {
417 self.require_idle()?;
418
419 let count = self
420 .inner
421 .match_and_add_prefix(manager)
422 .unwrap_or_else(|_| panic!("prefix match should not produce duplicates"));
423
424 if count > 0 {
425 self.prefill_position += count * self.inner.block_size();
426 self.kv_position = self.prefill_position;
427 }
428
429 self.delegate.on_event(&SequenceEvent::PrefixMatched {
430 blocks_matched: count,
431 });
432
433 Ok(count)
434 }
435
436 pub fn schedule_prefill(
445 &mut self,
446 num_tokens: usize,
447 manager: &BlockManager<T>,
448 ) -> Result<(), ScheduleError> {
449 self.require_idle()?;
450
451 if self.is_prefill_complete() {
452 return Err(ScheduleError::PrefillComplete);
453 }
454
455 let num_input = self.inner.num_input_tokens();
456 let new_position = self.prefill_position + num_tokens;
457 if new_position > num_input {
458 return Err(ScheduleError::PrefillOverrun {
459 position: self.prefill_position,
460 num_tokens,
461 num_input_tokens: num_input,
462 });
463 }
464
465 let bs = self.inner.block_size();
467 let total_blocks_needed = new_position.div_ceil(bs);
468
469 let already_have = self.inner.assigned_blocks()
471 + self.inner.staged_blocks()
472 + self.inner.unassigned_blocks();
473
474 let to_allocate = total_blocks_needed.saturating_sub(already_have);
475
476 if to_allocate > 0 && !self.inner.allocate_blocks(to_allocate, manager) {
477 return Err(ScheduleError::AllocationFailed {
478 needed: to_allocate,
479 });
480 }
481
482 self.state = SequenceState::PrefillScheduled {
483 num_tokens,
484 blocks_allocated: to_allocate,
485 };
486
487 self.delegate.on_event(&SequenceEvent::PrefillScheduled {
488 num_tokens,
489 blocks_allocated: to_allocate,
490 });
491
492 Ok(())
493 }
494
495 pub fn apply_prefill(
501 &mut self,
502 token: Option<Token>,
503 manager: &BlockManager<T>,
504 ) -> Result<(), ApplyError> {
505 let (num_tokens, _blocks_allocated) = match self.state {
506 SequenceState::PrefillScheduled {
507 num_tokens,
508 blocks_allocated,
509 } => (num_tokens, blocks_allocated),
510 other => {
511 return Err(ApplyError::WrongState {
512 expected: "PrefillScheduled",
513 actual: other,
514 });
515 }
516 };
517
518 let new_position = self.prefill_position + num_tokens;
519 let is_final = new_position >= self.inner.num_input_tokens();
520
521 if token.is_some() && !is_final {
522 return Err(ApplyError::TokenOnNonFinalChunk);
523 }
524 if is_final && token.is_none() && self.inner.max_output_tokens() > 0 {
525 return Err(ApplyError::MissingTokenOnFinalChunk);
526 }
527
528 let blocks_registered_before = self.inner.assigned_blocks();
529
530 self.inner.complete_and_register_pending(manager);
532 self.prefill_position = new_position;
533 self.kv_position = self.prefill_position;
534
535 let token_emitted = token.is_some();
540 if let Some(tok) = token {
541 self.inner.append_token(tok);
542 }
543
544 let blocks_registered =
545 self.inner.assigned_blocks() - blocks_registered_before + self.inner.staged_blocks();
546
547 self.state = SequenceState::Idle;
548 self.delegate.on_event(&SequenceEvent::PrefillApplied {
549 num_tokens,
550 blocks_registered,
551 token_emitted,
552 });
553
554 Ok(())
555 }
556
557 pub fn schedule_decode(&mut self, manager: &BlockManager<T>) -> Result<(), ScheduleError> {
567 self.require_idle()?;
568 self.require_prefill_complete()?;
569 self.require_not_complete()?;
570 self.require_one_dangling()?;
571
572 let complete_in_seq = self.inner.complete_sequence_blocks();
574 let registered = self.inner.assigned_blocks() + self.inner.staged_blocks();
575 let pending = complete_in_seq.saturating_sub(registered);
576
577 let need = pending + 1;
579 let have = self.inner.unassigned_blocks();
580 let to_allocate = need.saturating_sub(have);
581
582 if to_allocate > 0 && !self.inner.allocate_blocks(to_allocate, manager) {
583 return Err(ScheduleError::AllocationFailed {
584 needed: to_allocate,
585 });
586 }
587
588 self.state = SequenceState::DecodeScheduled {
589 blocks_allocated: to_allocate,
590 };
591 self.delegate.on_event(&SequenceEvent::DecodeScheduled {
592 blocks_allocated: to_allocate,
593 });
594
595 Ok(())
596 }
597
598 pub fn apply_decode(
600 &mut self,
601 token: Token,
602 manager: &BlockManager<T>,
603 ) -> Result<DecodeOutcome, ApplyError> {
604 let _blocks_allocated = match self.state {
605 SequenceState::DecodeScheduled { blocks_allocated } => blocks_allocated,
606 other => {
607 return Err(ApplyError::WrongState {
608 expected: "DecodeScheduled",
609 actual: other,
610 });
611 }
612 };
613
614 let crossed = self.inner.append_token(token);
615 let block_completed = crossed.is_some();
616
617 self.inner.complete_and_register_pending(manager);
621
622 self.kv_position += 1;
623 self.state = SequenceState::Idle;
624 self.delegate.on_event(&SequenceEvent::DecodeApplied {
625 token,
626 block_completed,
627 });
628
629 let is_complete = self.inner.is_complete();
630 Ok(match (block_completed, is_complete) {
631 (false, false) => DecodeOutcome::Continue,
632 (true, false) => DecodeOutcome::BlockCompleted,
633 (false, true) => DecodeOutcome::MaxLength,
634 (true, true) => DecodeOutcome::BlockCompletedAndMaxLength,
635 })
636 }
637
638 pub fn schedule_speculative(
648 &mut self,
649 num_draft_tokens: usize,
650 manager: &BlockManager<T>,
651 ) -> Result<(), ScheduleError> {
652 self.require_idle()?;
653 self.require_prefill_complete()?;
654 self.require_not_complete()?;
655 self.require_one_dangling()?;
656
657 let num_draft_tokens = num_draft_tokens.min(self.inner.remaining_tokens());
659
660 let bs = self.inner.block_size();
661 let future_total = self.inner.total_tokens() + num_draft_tokens;
662 let future_blocks = future_total.div_ceil(bs);
663 let have = self.inner.assigned_blocks()
664 + self.inner.staged_blocks()
665 + self.inner.unassigned_blocks();
666 let to_allocate = future_blocks.saturating_sub(have);
667
668 if to_allocate > 0 && !self.inner.allocate_blocks(to_allocate, manager) {
669 return Err(ScheduleError::AllocationFailed {
670 needed: to_allocate,
671 });
672 }
673
674 self.state = SequenceState::SpeculativeScheduled {
675 num_tokens: num_draft_tokens,
676 blocks_allocated: to_allocate,
677 };
678 self.delegate
679 .on_event(&SequenceEvent::SpeculativeScheduled {
680 num_tokens: num_draft_tokens,
681 blocks_allocated: to_allocate,
682 });
683
684 Ok(())
685 }
686
687 pub fn apply_speculative(
693 &mut self,
694 accepted: &[Token],
695 manager: &BlockManager<T>,
696 ) -> Result<DecodeOutcome, ApplyError> {
697 let (scheduled_tokens, _blocks_allocated) = match self.state {
698 SequenceState::SpeculativeScheduled {
699 num_tokens,
700 blocks_allocated,
701 } => (num_tokens, blocks_allocated),
702 other => {
703 return Err(ApplyError::WrongState {
704 expected: "SpeculativeScheduled",
705 actual: other,
706 });
707 }
708 };
709
710 if accepted.len() > scheduled_tokens {
711 return Err(ApplyError::AcceptedExceedsScheduled {
712 accepted: accepted.len(),
713 scheduled: scheduled_tokens,
714 });
715 }
716
717 let mut block_completed = false;
719 for &token in accepted {
720 let crossed = self.inner.append_token(token);
721 if crossed.is_some() {
722 block_completed = true;
723 }
724 }
725
726 self.inner.complete_and_register_pending(manager);
729
730 self.kv_position += accepted.len();
731
732 let excess = self.lifo_drop_excess_unassigned();
736
737 self.state = SequenceState::Idle;
738 self.delegate.on_event(&SequenceEvent::SpeculativeApplied {
739 accepted: accepted.len(),
740 scheduled: scheduled_tokens,
741 blocks_released: excess,
742 });
743
744 let is_complete = self.inner.is_complete();
745 Ok(match (block_completed, is_complete) {
746 (false, false) => DecodeOutcome::Continue,
747 (true, false) => DecodeOutcome::BlockCompleted,
748 (false, true) => DecodeOutcome::MaxLength,
749 (true, true) => DecodeOutcome::BlockCompletedAndMaxLength,
750 })
751 }
752
753 pub fn revert_schedule(&mut self) -> Result<(), ApplyError> {
763 let blocks_to_release = match self.state {
764 SequenceState::PrefillScheduled {
765 blocks_allocated, ..
766 } => blocks_allocated,
767 SequenceState::DecodeScheduled { blocks_allocated } => blocks_allocated,
768 SequenceState::SpeculativeScheduled {
769 blocks_allocated, ..
770 } => blocks_allocated,
771 other => {
772 return Err(ApplyError::WrongState {
773 expected: "any Scheduled state",
774 actual: other,
775 });
776 }
777 };
778
779 self.lifo_pop_unassigned(blocks_to_release);
780
781 self.state = SequenceState::Idle;
782 self.delegate.on_event(&SequenceEvent::ScheduleReverted {
783 blocks_released: blocks_to_release,
784 });
785
786 Ok(())
787 }
788
789 pub fn drop_unassigned(&mut self, count: usize) -> usize {
796 assert!(
797 self.state == SequenceState::Idle,
798 "drop_unassigned called in non-Idle state: {:?}",
799 self.state
800 );
801 let dropped = self.lifo_pop_unassigned(count);
802 if dropped > 0 {
803 self.delegate
804 .on_event(&SequenceEvent::UnassignedDropped { count: dropped });
805 }
806 dropped
807 }
808
809 pub fn release(&mut self) -> Result<(), ApplyError> {
815 self.require_idle_for_apply()?;
816 self.inner.release();
817 self.delegate.on_event(&SequenceEvent::Released);
818 Ok(())
819 }
820
821 pub fn reacquire(&mut self, manager: &BlockManager<T>) -> Result<bool, ApplyError> {
823 self.require_idle_for_apply()?;
824 let success = self.inner.reacquire(manager);
825 let prefix_matched = self.inner.prefix_matched_blocks();
826 self.delegate.on_event(&SequenceEvent::Reacquired {
827 prefix_matched,
828 success,
829 });
830 Ok(success)
831 }
832
833 pub fn append_tokens(&mut self, tokens: &[Token]) -> Result<(), ApplyError> {
841 self.require_idle_for_apply()?;
842 let remaining = self.inner.remaining_tokens();
843 if tokens.len() > remaining {
844 return Err(ApplyError::AppendExceedsRemaining {
845 requested: tokens.len(),
846 remaining,
847 });
848 }
849 for &token in tokens {
850 self.inner.append_token(token);
851 }
852 Ok(())
853 }
854
855 pub fn state(&self) -> SequenceState {
861 self.state
862 }
863
864 pub fn prefill_position(&self) -> usize {
866 self.prefill_position
867 }
868
869 pub fn is_prefill_complete(&self) -> bool {
871 self.prefill_position >= self.inner.num_input_tokens()
872 }
873
874 pub fn kv_position(&self) -> usize {
876 self.kv_position
877 }
878
879 pub fn tail_tokens(&self) -> usize {
882 self.inner.total_tokens().saturating_sub(self.kv_position)
883 }
884
885 pub fn delegate(&self) -> &Arc<dyn SequenceDelegate> {
887 &self.delegate
888 }
889
890 delegate_to_inner! {
893 pub fn generated_tokens(&self) -> usize;
894 pub fn max_output_tokens(&self) -> usize;
895 pub fn num_input_tokens(&self) -> usize;
896 pub fn total_tokens(&self) -> usize;
897 pub fn remaining_tokens(&self) -> usize;
898 pub fn num_blocks(&self) -> usize;
899 pub fn assigned_blocks(&self) -> usize;
900 pub fn staged_blocks(&self) -> usize;
901 pub fn unassigned_blocks(&self) -> usize;
902 pub fn prefix_matched_blocks(&self) -> usize;
903 pub fn block_size(&self) -> usize;
904 pub fn is_complete(&self) -> bool;
905 }
906
907 pub fn inner(&self) -> &RequestSequence<T> {
909 &self.inner
910 }
911
912 #[allow(dead_code)]
914 pub(crate) fn inner_mut(&mut self) -> &mut RequestSequence<T> {
915 &mut self.inner
916 }
917
918 fn require_idle(&self) -> Result<(), ScheduleError> {
923 if self.state != SequenceState::Idle {
924 return Err(ScheduleError::NotIdle { state: self.state });
925 }
926 Ok(())
927 }
928
929 fn require_idle_for_apply(&self) -> Result<(), ApplyError> {
930 if self.state != SequenceState::Idle {
931 return Err(ApplyError::WrongState {
932 expected: "Idle",
933 actual: self.state,
934 });
935 }
936 Ok(())
937 }
938
939 fn require_prefill_complete(&self) -> Result<(), ScheduleError> {
940 if !self.is_prefill_complete() {
941 return Err(ScheduleError::PrefillNotComplete {
942 position: self.prefill_position,
943 num_input_tokens: self.inner.num_input_tokens(),
944 });
945 }
946 Ok(())
947 }
948
949 fn require_not_complete(&self) -> Result<(), ScheduleError> {
950 if self.inner.is_complete() {
951 return Err(ScheduleError::GenerationComplete {
952 generated: self.inner.generated_tokens(),
953 max_output: self.inner.max_output_tokens(),
954 });
955 }
956 Ok(())
957 }
958
959 fn require_one_dangling(&self) -> Result<(), ScheduleError> {
960 let dangling = self.tail_tokens();
961 if dangling != 1 {
962 return Err(ScheduleError::WrongDanglingCount {
963 expected: 1,
964 actual: dangling,
965 });
966 }
967 Ok(())
968 }
969
970 fn lifo_pop_unassigned(&mut self, count: usize) -> usize {
972 let assignments = self.inner.assignments_mut();
973 let mut dropped = 0;
974 for _ in 0..count {
975 if assignments.pop_last_unassigned().is_some() {
976 dropped += 1;
977 } else {
978 break;
979 }
980 }
981 dropped
982 }
983
984 fn lifo_drop_excess_unassigned(&mut self) -> usize {
987 let bs = self.inner.block_size();
988 let total = self.inner.total_tokens();
989 let need_gen = if self.inner.is_complete() {
992 0
993 } else if !total.is_multiple_of(bs) {
994 1
996 } else {
997 1
1000 };
1001
1002 let current = self.inner.unassigned_blocks();
1003 let excess = current.saturating_sub(need_gen);
1004 self.lifo_pop_unassigned(excess)
1005 }
1006}
1007
1008impl<T: BlockMetadata> std::fmt::Debug for SchedulableSequence<T> {
1009 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1010 f.debug_struct("SchedulableSequence")
1011 .field("state", &self.state)
1012 .field("prefill_position", &self.prefill_position)
1013 .field("kv_position", &self.kv_position)
1014 .field("inner", &self.inner)
1015 .finish()
1016 }
1017}
1018
1019#[cfg(test)]
1020mod tests {
1021 use super::*;
1022 use crate::testing::{TestMeta, create_test_manager};
1023 use std::sync::Mutex;
1024
1025 const BLOCK_SIZE: u32 = 4;
1026
1027 struct CollectingDelegate {
1032 events: Mutex<Vec<SequenceEvent>>,
1033 }
1034
1035 impl CollectingDelegate {
1036 fn new() -> Self {
1037 Self {
1038 events: Mutex::new(Vec::new()),
1039 }
1040 }
1041
1042 fn events(&self) -> Vec<SequenceEvent> {
1043 self.events.lock().unwrap().clone()
1044 }
1045 }
1046
1047 impl SequenceDelegate for CollectingDelegate {
1048 fn on_event(&self, event: &SequenceEvent) {
1049 self.events.lock().unwrap().push(event.clone());
1050 }
1051 }
1052
1053 fn noop_delegate() -> Option<Arc<dyn SequenceDelegate>> {
1054 None
1055 }
1056
1057 fn make_tokens(n: usize) -> Vec<Token> {
1058 (0..n as u32).collect()
1059 }
1060
1061 #[test]
1066 fn test_new_starts_idle() {
1067 let delegate = Arc::new(CollectingDelegate::new());
1068 let seq = SchedulableSequence::<TestMeta>::new(
1069 make_tokens(8),
1070 10,
1071 BLOCK_SIZE,
1072 Some(delegate.clone()),
1073 );
1074
1075 assert_eq!(seq.state(), SequenceState::Idle);
1076 assert_eq!(seq.prefill_position(), 0);
1077 assert_eq!(seq.kv_position(), 0);
1078 assert_eq!(seq.tail_tokens(), 8);
1079 assert_eq!(seq.num_input_tokens(), 8);
1080 assert_eq!(seq.max_output_tokens(), 10);
1081 assert_eq!(seq.block_size(), BLOCK_SIZE as usize);
1082 assert!(!seq.is_prefill_complete());
1083
1084 let events = delegate.events();
1085 assert_eq!(events.len(), 1);
1086 assert_eq!(
1087 events[0],
1088 SequenceEvent::Created {
1089 num_input_tokens: 8,
1090 max_output_tokens: 10,
1091 block_size: BLOCK_SIZE as usize,
1092 }
1093 );
1094 }
1095
1096 #[test]
1101 fn test_schedule_prefill_requires_idle() {
1102 let manager = create_test_manager::<TestMeta>(20);
1103 let mut seq =
1104 SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
1105
1106 seq.schedule_prefill(4, &manager).unwrap();
1107 let err = seq.schedule_prefill(4, &manager).unwrap_err();
1108 assert!(matches!(err, ScheduleError::NotIdle { .. }));
1109 }
1110
1111 #[test]
1112 fn test_schedule_decode_requires_idle() {
1113 let manager = create_test_manager::<TestMeta>(20);
1114 let mut seq =
1115 SchedulableSequence::<TestMeta>::new(make_tokens(4), 10, BLOCK_SIZE, noop_delegate());
1116
1117 seq.schedule_prefill(4, &manager).unwrap();
1119 seq.apply_prefill(Some(1000), &manager).unwrap();
1120
1121 seq.schedule_decode(&manager).unwrap();
1123 let err = seq.schedule_decode(&manager).unwrap_err();
1124 assert!(matches!(err, ScheduleError::NotIdle { .. }));
1125 }
1126
1127 #[test]
1128 fn test_apply_prefill_requires_scheduled() {
1129 let manager = create_test_manager::<TestMeta>(20);
1130 let mut seq =
1131 SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
1132
1133 let err = seq.apply_prefill(None, &manager).unwrap_err();
1134 assert!(matches!(err, ApplyError::WrongState { .. }));
1135 }
1136
1137 #[test]
1138 fn test_apply_decode_requires_scheduled() {
1139 let manager = create_test_manager::<TestMeta>(20);
1140 let mut seq =
1141 SchedulableSequence::<TestMeta>::new(make_tokens(4), 10, BLOCK_SIZE, noop_delegate());
1142
1143 let err = seq.apply_decode(100, &manager).unwrap_err();
1144 assert!(matches!(err, ApplyError::WrongState { .. }));
1145 }
1146
1147 #[test]
1148 fn test_decode_requires_prefill_complete() {
1149 let manager = create_test_manager::<TestMeta>(20);
1150 let mut seq =
1151 SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
1152
1153 let err = seq.schedule_decode(&manager).unwrap_err();
1154 assert!(matches!(err, ScheduleError::PrefillNotComplete { .. }));
1155 }
1156
1157 #[test]
1158 fn test_speculative_requires_prefill_complete() {
1159 let manager = create_test_manager::<TestMeta>(20);
1160 let mut seq =
1161 SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
1162
1163 let err = seq.schedule_speculative(3, &manager).unwrap_err();
1164 assert!(matches!(err, ScheduleError::PrefillNotComplete { .. }));
1165 }
1166
1167 #[test]
1172 fn test_prefill_single_chunk() {
1173 let manager = create_test_manager::<TestMeta>(20);
1174 let mut seq =
1175 SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
1176
1177 seq.schedule_prefill(8, &manager).unwrap();
1179 assert_eq!(
1180 seq.state(),
1181 SequenceState::PrefillScheduled {
1182 num_tokens: 8,
1183 blocks_allocated: 2, }
1185 );
1186
1187 seq.apply_prefill(Some(1000), &manager).unwrap();
1188 assert_eq!(seq.state(), SequenceState::Idle);
1189 assert_eq!(seq.prefill_position(), 8);
1190 assert_eq!(seq.kv_position(), 8);
1191 assert!(seq.is_prefill_complete());
1192 assert_eq!(seq.assigned_blocks(), 2);
1193 assert_eq!(seq.unassigned_blocks(), 0); assert_eq!(seq.tail_tokens(), 1); }
1196
1197 #[test]
1198 fn test_prefill_chunked() {
1199 let manager = create_test_manager::<TestMeta>(20);
1200 let mut seq =
1201 SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
1202
1203 seq.schedule_prefill(4, &manager).unwrap();
1205 seq.apply_prefill(None, &manager).unwrap();
1206 assert_eq!(seq.prefill_position(), 4);
1207 assert_eq!(seq.kv_position(), 4);
1208 assert!(!seq.is_prefill_complete());
1209 assert_eq!(seq.assigned_blocks(), 1);
1210
1211 seq.schedule_prefill(4, &manager).unwrap();
1213 seq.apply_prefill(Some(1000), &manager).unwrap();
1214 assert_eq!(seq.prefill_position(), 8);
1215 assert_eq!(seq.kv_position(), 8);
1216 assert!(seq.is_prefill_complete());
1217 assert_eq!(seq.assigned_blocks(), 2);
1218 assert_eq!(seq.unassigned_blocks(), 0); assert_eq!(seq.tail_tokens(), 1);
1220 }
1221
1222 #[test]
1223 fn test_prefill_final_with_first_token() {
1224 let manager = create_test_manager::<TestMeta>(20);
1225 let mut seq =
1226 SchedulableSequence::<TestMeta>::new(make_tokens(4), 10, BLOCK_SIZE, noop_delegate());
1227
1228 seq.schedule_prefill(4, &manager).unwrap();
1229 seq.apply_prefill(Some(100), &manager).unwrap();
1230
1231 assert!(seq.is_prefill_complete());
1232 assert_eq!(seq.generated_tokens(), 1);
1233 assert_eq!(seq.total_tokens(), 5);
1234 assert_eq!(seq.kv_position(), 4);
1235 assert_eq!(seq.tail_tokens(), 1);
1236 }
1237
1238 #[test]
1239 fn test_prefill_token_on_non_final_rejected() {
1240 let manager = create_test_manager::<TestMeta>(20);
1241 let mut seq =
1242 SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
1243
1244 seq.schedule_prefill(4, &manager).unwrap();
1245 let err = seq.apply_prefill(Some(100), &manager).unwrap_err();
1246 assert!(matches!(err, ApplyError::TokenOnNonFinalChunk));
1247 }
1248
1249 #[test]
1250 fn test_prefill_overrun_rejected() {
1251 let manager = create_test_manager::<TestMeta>(20);
1252 let mut seq =
1253 SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
1254
1255 let err = seq.schedule_prefill(9, &manager).unwrap_err();
1256 assert!(matches!(err, ScheduleError::PrefillOverrun { .. }));
1257 }
1258
1259 #[test]
1260 fn test_prefill_allocation_failure() {
1261 let manager = create_test_manager::<TestMeta>(1); let mut seq =
1263 SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
1264
1265 let err = seq.schedule_prefill(8, &manager).unwrap_err();
1266 assert!(matches!(err, ScheduleError::AllocationFailed { .. }));
1267 assert_eq!(seq.state(), SequenceState::Idle);
1269 }
1270
1271 #[test]
1272 fn test_schedule_prefill_after_complete_rejected() {
1273 let manager = create_test_manager::<TestMeta>(20);
1274 let mut seq =
1275 SchedulableSequence::<TestMeta>::new(make_tokens(4), 10, BLOCK_SIZE, noop_delegate());
1276
1277 seq.schedule_prefill(4, &manager).unwrap();
1278 seq.apply_prefill(Some(1000), &manager).unwrap();
1279
1280 let err = seq.schedule_prefill(1, &manager).unwrap_err();
1281 assert!(matches!(err, ScheduleError::PrefillComplete));
1282 }
1283
1284 #[test]
1285 fn test_apply_prefill_none_on_final_rejected() {
1286 let manager = create_test_manager::<TestMeta>(20);
1287 let mut seq =
1288 SchedulableSequence::<TestMeta>::new(make_tokens(4), 10, BLOCK_SIZE, noop_delegate());
1289
1290 seq.schedule_prefill(4, &manager).unwrap();
1291 let err = seq.apply_prefill(None, &manager).unwrap_err();
1292 assert!(matches!(err, ApplyError::MissingTokenOnFinalChunk));
1293 }
1294
1295 fn prefilled_seq(
1304 num_input: usize,
1305 max_output: usize,
1306 manager: &BlockManager<TestMeta>,
1307 ) -> SchedulableSequence<TestMeta> {
1308 let mut seq = SchedulableSequence::new(
1309 make_tokens(num_input),
1310 max_output,
1311 BLOCK_SIZE,
1312 noop_delegate(),
1313 );
1314 if num_input > 0 {
1315 seq.schedule_prefill(num_input, manager).unwrap();
1316 seq.apply_prefill(Some(1000), manager).unwrap();
1317 }
1318 seq
1319 }
1320
1321 #[test]
1322 fn test_decode_continue() {
1323 let manager = create_test_manager::<TestMeta>(20);
1324 let mut seq = prefilled_seq(5, 10, &manager);
1325 seq.schedule_decode(&manager).unwrap();
1328 let outcome = seq.apply_decode(100, &manager).unwrap();
1329 assert_eq!(outcome, DecodeOutcome::Continue);
1330 assert_eq!(seq.generated_tokens(), 2); assert_eq!(seq.state(), SequenceState::Idle);
1332 }
1333
1334 #[test]
1335 fn test_decode_block_completed() {
1336 let manager = create_test_manager::<TestMeta>(20);
1337 let mut seq = prefilled_seq(4, 10, &manager);
1338 for _ in 0..2 {
1342 seq.schedule_decode(&manager).unwrap();
1343 let outcome = seq.apply_decode(100, &manager).unwrap();
1344 assert_eq!(outcome, DecodeOutcome::Continue);
1345 }
1346 seq.schedule_decode(&manager).unwrap();
1347 let outcome = seq.apply_decode(100, &manager).unwrap();
1348 assert_eq!(outcome, DecodeOutcome::BlockCompleted);
1349 assert_eq!(seq.assigned_blocks(), 2);
1350 }
1351
1352 #[test]
1353 fn test_decode_max_length() {
1354 let manager = create_test_manager::<TestMeta>(20);
1355 let mut seq = prefilled_seq(5, 2, &manager);
1356 seq.schedule_decode(&manager).unwrap();
1359 let outcome = seq.apply_decode(100, &manager).unwrap();
1360 assert_eq!(outcome, DecodeOutcome::MaxLength);
1361 assert!(seq.is_complete());
1362 }
1363
1364 #[test]
1365 fn test_decode_block_and_max() {
1366 let manager = create_test_manager::<TestMeta>(20);
1367 let mut seq = prefilled_seq(4, 4, &manager);
1368 for _ in 0..2 {
1372 seq.schedule_decode(&manager).unwrap();
1373 seq.apply_decode(100, &manager).unwrap();
1374 }
1375 seq.schedule_decode(&manager).unwrap();
1376 let outcome = seq.apply_decode(100, &manager).unwrap();
1377 assert_eq!(outcome, DecodeOutcome::BlockCompletedAndMaxLength);
1378 }
1379
1380 #[test]
1381 fn test_decode_allocates_gen_block() {
1382 let manager = create_test_manager::<TestMeta>(20);
1383 let mut seq = prefilled_seq(4, 10, &manager);
1384 seq.schedule_decode(&manager).unwrap();
1388 assert_eq!(
1389 seq.state(),
1390 SequenceState::DecodeScheduled {
1391 blocks_allocated: 1
1392 }
1393 );
1394 assert_eq!(seq.unassigned_blocks(), 1);
1395
1396 seq.apply_decode(100, &manager).unwrap(); seq.schedule_decode(&manager).unwrap();
1399 seq.apply_decode(101, &manager).unwrap(); seq.schedule_decode(&manager).unwrap();
1401 let outcome = seq.apply_decode(102, &manager).unwrap(); assert_eq!(outcome, DecodeOutcome::BlockCompleted);
1403 assert_eq!(seq.unassigned_blocks(), 0);
1404
1405 seq.schedule_decode(&manager).unwrap();
1407 assert_eq!(
1408 seq.state(),
1409 SequenceState::DecodeScheduled {
1410 blocks_allocated: 1
1411 }
1412 );
1413 assert_eq!(seq.unassigned_blocks(), 1);
1414 }
1415
1416 #[test]
1417 fn test_decode_generation_complete_rejected() {
1418 let manager = create_test_manager::<TestMeta>(20);
1419 let mut seq = prefilled_seq(5, 2, &manager);
1420 seq.schedule_decode(&manager).unwrap();
1423 seq.apply_decode(100, &manager).unwrap();
1424 assert!(seq.is_complete()); let err = seq.schedule_decode(&manager).unwrap_err();
1427 assert!(matches!(err, ScheduleError::GenerationComplete { .. }));
1428 }
1429
1430 #[test]
1435 fn test_speculative_basic() {
1436 let manager = create_test_manager::<TestMeta>(20);
1437 let mut seq = prefilled_seq(8, 10, &manager);
1438 seq.schedule_speculative(2, &manager).unwrap();
1442 assert!(matches!(
1443 seq.state(),
1444 SequenceState::SpeculativeScheduled { num_tokens: 2, .. }
1445 ));
1446
1447 let outcome = seq.apply_speculative(&[100, 101], &manager).unwrap();
1449 assert_eq!(outcome, DecodeOutcome::Continue);
1450 assert_eq!(seq.generated_tokens(), 3); assert_eq!(seq.state(), SequenceState::Idle);
1452 }
1453
1454 #[test]
1455 fn test_speculative_partial_accept() {
1456 let manager = create_test_manager::<TestMeta>(20);
1457 let delegate = Arc::new(CollectingDelegate::new());
1458 let mut seq =
1459 SchedulableSequence::new(make_tokens(4), 10, BLOCK_SIZE, Some(delegate.clone()));
1460 seq.schedule_prefill(4, &manager).unwrap();
1461 seq.apply_prefill(Some(1000), &manager).unwrap();
1462 let avail_before = manager.available_blocks();
1465
1466 seq.schedule_speculative(4, &manager).unwrap();
1468
1469 let outcome = seq.apply_speculative(&[100, 101], &manager).unwrap();
1471 assert_eq!(outcome, DecodeOutcome::Continue);
1472 assert_eq!(seq.generated_tokens(), 3); assert_eq!(seq.unassigned_blocks(), 1); let events = delegate.events();
1477 let last = events.last().unwrap();
1478 if let SequenceEvent::SpeculativeApplied {
1479 accepted,
1480 scheduled,
1481 ..
1482 } = last
1483 {
1484 assert_eq!(*accepted, 2);
1485 assert_eq!(*scheduled, 4);
1486 } else {
1487 panic!("expected SpeculativeApplied");
1488 }
1489
1490 let _ = avail_before;
1491 }
1492
1493 #[test]
1494 fn test_speculative_single_accept() {
1495 let manager = create_test_manager::<TestMeta>(20);
1496 let mut seq = prefilled_seq(4, 10, &manager);
1497
1498 seq.schedule_speculative(5, &manager).unwrap();
1499 let outcome = seq.apply_speculative(&[100], &manager).unwrap();
1500 assert_eq!(outcome, DecodeOutcome::Continue);
1501 assert_eq!(seq.generated_tokens(), 2); }
1503
1504 #[test]
1505 fn test_speculative_zero_accept() {
1506 let manager = create_test_manager::<TestMeta>(20);
1507 let mut seq = prefilled_seq(4, 10, &manager);
1508
1509 let avail_before = manager.available_blocks();
1510 seq.schedule_speculative(3, &manager).unwrap();
1511 let avail_after_schedule = manager.available_blocks();
1512
1513 let outcome = seq.apply_speculative(&[], &manager).unwrap();
1514 assert_eq!(outcome, DecodeOutcome::Continue);
1515 assert_eq!(seq.generated_tokens(), 1); assert_eq!(seq.unassigned_blocks(), 1); assert!(manager.available_blocks() >= avail_after_schedule);
1519 let _ = avail_before;
1520 }
1521
1522 #[test]
1523 fn test_speculative_exceeds_scheduled_rejected() {
1524 let manager = create_test_manager::<TestMeta>(20);
1525 let mut seq = prefilled_seq(4, 10, &manager);
1526
1527 seq.schedule_speculative(2, &manager).unwrap();
1528 let err = seq
1529 .apply_speculative(&[100, 101, 102], &manager)
1530 .unwrap_err();
1531 assert!(matches!(
1532 err,
1533 ApplyError::AcceptedExceedsScheduled {
1534 accepted: 3,
1535 scheduled: 2,
1536 }
1537 ));
1538 }
1539
1540 #[test]
1541 fn test_speculative_block_boundaries() {
1542 let manager = create_test_manager::<TestMeta>(20);
1543 let mut seq = prefilled_seq(7, 20, &manager);
1544 seq.schedule_speculative(5, &manager).unwrap();
1549 let outcome = seq
1550 .apply_speculative(&[100, 101, 102, 103, 104], &manager)
1551 .unwrap();
1552 assert_eq!(outcome, DecodeOutcome::BlockCompleted);
1553 assert_eq!(seq.generated_tokens(), 6); assert_eq!(seq.assigned_blocks(), 3); }
1556
1557 #[test]
1562 fn test_revert_prefill() {
1563 let manager = create_test_manager::<TestMeta>(20);
1564 let mut seq =
1565 SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
1566
1567 let avail_before = manager.available_blocks();
1568 seq.schedule_prefill(4, &manager).unwrap();
1569 assert!(manager.available_blocks() < avail_before);
1570
1571 seq.revert_schedule().unwrap();
1572 assert_eq!(seq.state(), SequenceState::Idle);
1573 assert_eq!(manager.available_blocks(), avail_before);
1574 }
1575
1576 #[test]
1577 fn test_revert_decode() {
1578 let manager = create_test_manager::<TestMeta>(20);
1579 let mut seq = prefilled_seq(4, 10, &manager);
1580 for _ in 0..3 {
1584 seq.schedule_decode(&manager).unwrap();
1585 seq.apply_decode(100, &manager).unwrap();
1586 }
1587 assert_eq!(seq.unassigned_blocks(), 0);
1588
1589 let avail_before = manager.available_blocks();
1590 seq.schedule_decode(&manager).unwrap();
1591 assert_eq!(manager.available_blocks(), avail_before - 1);
1592
1593 seq.revert_schedule().unwrap();
1594 assert_eq!(seq.state(), SequenceState::Idle);
1595 assert_eq!(manager.available_blocks(), avail_before);
1596 }
1597
1598 #[test]
1599 fn test_revert_speculative() {
1600 let manager = create_test_manager::<TestMeta>(20);
1601 let mut seq = prefilled_seq(4, 10, &manager);
1602
1603 let avail_before = manager.available_blocks();
1604 seq.schedule_speculative(4, &manager).unwrap();
1605 let allocated = avail_before - manager.available_blocks();
1606
1607 seq.revert_schedule().unwrap();
1608 assert_eq!(seq.state(), SequenceState::Idle);
1609 assert_eq!(manager.available_blocks(), avail_before);
1611 assert!(allocated > 0 || seq.unassigned_blocks() > 0);
1612 }
1613
1614 #[test]
1615 fn test_revert_returns_blocks_to_manager() {
1616 let manager = create_test_manager::<TestMeta>(20);
1617 let mut seq =
1618 SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
1619
1620 let avail_before = manager.available_blocks();
1621 seq.schedule_prefill(8, &manager).unwrap();
1622 let avail_scheduled = manager.available_blocks();
1623 assert!(avail_scheduled < avail_before);
1624
1625 seq.revert_schedule().unwrap();
1626 assert_eq!(manager.available_blocks(), avail_before);
1627 }
1628
1629 #[test]
1634 fn test_drop_unassigned_lifo() {
1635 let manager = create_test_manager::<TestMeta>(20);
1636 let mut seq = prefilled_seq(4, 10, &manager);
1637 seq.schedule_decode(&manager).unwrap();
1641 seq.apply_decode(100, &manager).unwrap();
1642 assert_eq!(seq.unassigned_blocks(), 1);
1643
1644 let dropped = seq.drop_unassigned(1);
1645 assert_eq!(dropped, 1);
1646 assert_eq!(seq.unassigned_blocks(), 0);
1647 }
1648
1649 #[test]
1650 fn test_drop_unassigned_partial() {
1651 let manager = create_test_manager::<TestMeta>(20);
1652 let mut seq = prefilled_seq(8, 10, &manager);
1653 seq.schedule_decode(&manager).unwrap();
1657 seq.apply_decode(100, &manager).unwrap();
1658 assert_eq!(seq.unassigned_blocks(), 1);
1659
1660 let dropped = seq.drop_unassigned(5);
1662 assert_eq!(dropped, 1);
1663 assert_eq!(seq.unassigned_blocks(), 0);
1664 }
1665
1666 #[test]
1667 fn test_drop_unassigned_zero() {
1668 let manager = create_test_manager::<TestMeta>(20);
1669 let mut seq = prefilled_seq(4, 10, &manager);
1670
1671 seq.schedule_decode(&manager).unwrap();
1673 seq.apply_decode(100, &manager).unwrap();
1674
1675 let dropped = seq.drop_unassigned(0);
1676 assert_eq!(dropped, 0);
1677 assert_eq!(seq.unassigned_blocks(), 1); }
1679
1680 #[test]
1685 fn test_delegate_full_lifecycle() {
1686 let manager = create_test_manager::<TestMeta>(20);
1687 let delegate = Arc::new(CollectingDelegate::new());
1688 let mut seq = SchedulableSequence::<TestMeta>::new(
1690 make_tokens(4),
1691 3,
1692 BLOCK_SIZE,
1693 Some(delegate.clone()),
1694 );
1695
1696 seq.schedule_prefill(4, &manager).unwrap();
1698 seq.apply_prefill(Some(1000), &manager).unwrap();
1699
1700 seq.schedule_decode(&manager).unwrap();
1702 seq.apply_decode(100, &manager).unwrap();
1703
1704 seq.schedule_decode(&manager).unwrap();
1706 seq.apply_decode(101, &manager).unwrap();
1707
1708 seq.release().unwrap();
1710
1711 let h = delegate.events();
1712 assert_eq!(h.len(), 8);
1713
1714 assert!(matches!(h[0], SequenceEvent::Created { .. }));
1715 assert!(matches!(h[1], SequenceEvent::PrefillScheduled { .. }));
1716 assert!(matches!(h[2], SequenceEvent::PrefillApplied { .. }));
1717 assert!(matches!(h[3], SequenceEvent::DecodeScheduled { .. }));
1718 assert!(matches!(h[4], SequenceEvent::DecodeApplied { .. }));
1719 assert!(matches!(h[5], SequenceEvent::DecodeScheduled { .. }));
1720 assert!(matches!(h[6], SequenceEvent::DecodeApplied { .. }));
1721 assert!(matches!(h[7], SequenceEvent::Released));
1722 }
1723
1724 #[test]
1729 fn test_full_lifecycle_prefill_decode_release() {
1730 let manager = create_test_manager::<TestMeta>(20);
1731 let mut seq =
1733 SchedulableSequence::<TestMeta>::new(make_tokens(6), 7, BLOCK_SIZE, noop_delegate());
1734
1735 seq.schedule_prefill(6, &manager).unwrap();
1737 seq.apply_prefill(Some(1000), &manager).unwrap();
1738
1739 assert!(seq.is_prefill_complete());
1740 assert_eq!(seq.assigned_blocks(), 1);
1741 assert_eq!(seq.unassigned_blocks(), 1); for i in 0..6u32 {
1745 seq.schedule_decode(&manager).unwrap();
1746 let outcome = seq.apply_decode(100 + i, &manager).unwrap();
1747 if i < 5 {
1748 match outcome {
1749 DecodeOutcome::Continue | DecodeOutcome::BlockCompleted => {}
1750 other => panic!("unexpected outcome at token {i}: {other:?}"),
1751 }
1752 } else {
1753 assert!(
1755 outcome == DecodeOutcome::MaxLength
1756 || outcome == DecodeOutcome::BlockCompletedAndMaxLength,
1757 "last token should hit max length, got: {outcome:?}"
1758 );
1759 }
1760 }
1761
1762 assert!(seq.is_complete());
1763 assert_eq!(seq.generated_tokens(), 7);
1764 assert_eq!(seq.total_tokens(), 13);
1765
1766 seq.release().unwrap();
1767 assert_eq!(seq.assigned_blocks(), 0);
1768 }
1769
1770 #[test]
1771 fn test_preempt_and_reacquire() {
1772 let manager = create_test_manager::<TestMeta>(20);
1773 let mut seq =
1774 SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
1775
1776 seq.schedule_prefill(8, &manager).unwrap();
1778 seq.apply_prefill(Some(1000), &manager).unwrap();
1779
1780 for _ in 0..2 {
1782 seq.schedule_decode(&manager).unwrap();
1783 seq.apply_decode(100, &manager).unwrap();
1784 }
1785 assert_eq!(seq.generated_tokens(), 3); seq.release().unwrap();
1789 assert_eq!(seq.assigned_blocks(), 0);
1790
1791 let success = seq.reacquire(&manager).unwrap();
1793 assert!(success);
1794 assert_eq!(seq.assigned_blocks(), 2);
1795 assert_eq!(seq.unassigned_blocks(), 0); assert_eq!(seq.generated_tokens(), 3);
1797
1798 seq.schedule_decode(&manager).unwrap();
1800 let outcome = seq.apply_decode(200, &manager).unwrap();
1801 assert_eq!(seq.generated_tokens(), 4);
1802 let _ = outcome;
1803 }
1804
1805 #[test]
1810 fn test_match_and_add_prefix() {
1811 let manager = create_test_manager::<TestMeta>(20);
1812 let tokens = make_tokens(8);
1813
1814 let seq_for_populate = crate::BlockSequence::new(tokens[..4].to_vec(), BLOCK_SIZE, None);
1816 let mutables = manager.allocate_blocks(1).unwrap();
1817 let registered: Vec<_> = mutables
1818 .into_iter()
1819 .zip(seq_for_populate.blocks().iter())
1820 .map(|(m, tb)| manager.register_block(m.complete(tb).unwrap()))
1821 .collect();
1822 drop(registered);
1823
1824 let mut seq = SchedulableSequence::<TestMeta>::new(tokens, 10, BLOCK_SIZE, noop_delegate());
1825 let matched = seq.match_and_add_prefix(&manager).unwrap();
1826 assert_eq!(matched, 1);
1827 assert_eq!(seq.prefill_position(), 4); assert_eq!(seq.kv_position(), 4); assert_eq!(seq.assigned_blocks(), 1);
1830
1831 seq.schedule_prefill(4, &manager).unwrap();
1833 seq.apply_prefill(Some(1000), &manager).unwrap();
1834 assert_eq!(seq.assigned_blocks(), 2);
1835 assert_eq!(seq.unassigned_blocks(), 0); assert_eq!(seq.tail_tokens(), 1);
1837 }
1838
1839 #[test]
1840 fn test_match_and_add_prefix_no_hits() {
1841 let manager = create_test_manager::<TestMeta>(20);
1842 let mut seq =
1843 SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
1844
1845 let matched = seq.match_and_add_prefix(&manager).unwrap();
1846 assert_eq!(matched, 0);
1847 assert_eq!(seq.prefill_position(), 0);
1848 }
1849
1850 #[test]
1855 fn test_empty_tokens_prefill() {
1856 let manager = create_test_manager::<TestMeta>(20);
1857 let mut seq = SchedulableSequence::<TestMeta>::new(vec![], 10, BLOCK_SIZE, noop_delegate());
1858
1859 assert!(seq.is_prefill_complete());
1860
1861 let err = seq.schedule_prefill(0, &manager).unwrap_err();
1863 assert!(matches!(err, ScheduleError::PrefillComplete));
1864
1865 let err = seq.schedule_decode(&manager).unwrap_err();
1867 assert!(matches!(err, ScheduleError::WrongDanglingCount { .. }));
1868
1869 seq.append_tokens(&[100]).unwrap();
1871 assert_eq!(seq.tail_tokens(), 1);
1872
1873 seq.schedule_decode(&manager).unwrap();
1874 let outcome = seq.apply_decode(101, &manager).unwrap();
1875 assert_eq!(outcome, DecodeOutcome::Continue);
1876 }
1877
1878 #[test]
1879 fn test_zero_max_output_no_gen_block() {
1880 let manager = create_test_manager::<TestMeta>(20);
1881 let mut seq =
1882 SchedulableSequence::<TestMeta>::new(make_tokens(4), 0, BLOCK_SIZE, noop_delegate());
1883
1884 seq.schedule_prefill(4, &manager).unwrap();
1885 seq.apply_prefill(None, &manager).unwrap();
1886
1887 assert_eq!(seq.assigned_blocks(), 1);
1888 assert_eq!(seq.unassigned_blocks(), 0); let err = seq.schedule_decode(&manager).unwrap_err();
1892 assert!(matches!(err, ScheduleError::GenerationComplete { .. }));
1893 }
1894
1895 #[test]
1896 fn test_debug_impl() {
1897 let seq =
1898 SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
1899 let debug_str = format!("{seq:?}");
1900 assert!(debug_str.contains("SchedulableSequence"));
1901 assert!(debug_str.contains("Idle"));
1902 }
1903
1904 #[test]
1905 fn test_revert_idle_rejected() {
1906 let mut seq =
1907 SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
1908 let err = seq.revert_schedule().unwrap_err();
1909 assert!(matches!(err, ApplyError::WrongState { .. }));
1910 }
1911
1912 #[test]
1913 fn test_release_while_scheduled_rejected() {
1914 let manager = create_test_manager::<TestMeta>(20);
1915 let mut seq =
1916 SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
1917
1918 seq.schedule_prefill(4, &manager).unwrap();
1919 let err = seq.release().unwrap_err();
1920 assert!(matches!(err, ApplyError::WrongState { .. }));
1921 }
1922
1923 #[test]
1924 fn test_reacquire_while_scheduled_rejected() {
1925 let manager = create_test_manager::<TestMeta>(20);
1926 let mut seq =
1927 SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
1928
1929 seq.schedule_prefill(4, &manager).unwrap();
1930 let err = seq.reacquire(&manager).unwrap_err();
1931 assert!(matches!(err, ApplyError::WrongState { .. }));
1932 }
1933
1934 #[test]
1939 fn test_dangling_tokens_tracking() {
1940 let manager = create_test_manager::<TestMeta>(20);
1941 let mut seq =
1942 SchedulableSequence::<TestMeta>::new(make_tokens(8), 20, BLOCK_SIZE, noop_delegate());
1943
1944 assert_eq!(seq.kv_position(), 0);
1946 assert_eq!(seq.tail_tokens(), 8);
1947
1948 seq.schedule_prefill(8, &manager).unwrap();
1950 seq.apply_prefill(Some(1000), &manager).unwrap();
1951 assert_eq!(seq.kv_position(), 8);
1952 assert_eq!(seq.tail_tokens(), 1);
1953
1954 seq.schedule_decode(&manager).unwrap();
1956 seq.apply_decode(100, &manager).unwrap();
1957 assert_eq!(seq.kv_position(), 9);
1958 assert_eq!(seq.tail_tokens(), 1);
1959
1960 seq.schedule_speculative(3, &manager).unwrap();
1962 seq.apply_speculative(&[200, 201], &manager).unwrap();
1963 assert_eq!(seq.kv_position(), 11);
1964 assert_eq!(seq.tail_tokens(), 1);
1965 }
1966
1967 #[test]
1968 fn test_decode_requires_one_dangling() {
1969 let manager = create_test_manager::<TestMeta>(20);
1970
1971 let mut seq = SchedulableSequence::<TestMeta>::new(vec![], 10, BLOCK_SIZE, noop_delegate());
1973 let err = seq.schedule_decode(&manager).unwrap_err();
1974 assert!(matches!(
1975 err,
1976 ScheduleError::WrongDanglingCount {
1977 expected: 1,
1978 actual: 0,
1979 }
1980 ));
1981
1982 seq.append_tokens(&[100, 101]).unwrap();
1984 assert_eq!(seq.tail_tokens(), 2);
1985 let err = seq.schedule_decode(&manager).unwrap_err();
1986 assert!(matches!(
1987 err,
1988 ScheduleError::WrongDanglingCount {
1989 expected: 1,
1990 actual: 2,
1991 }
1992 ));
1993 }
1994
1995 #[test]
1996 fn test_append_tokens_creates_dangling() {
1997 let manager = create_test_manager::<TestMeta>(20);
1998 let mut seq = SchedulableSequence::<TestMeta>::new(vec![], 10, BLOCK_SIZE, noop_delegate());
1999
2000 assert_eq!(seq.tail_tokens(), 0);
2001
2002 seq.append_tokens(&[100]).unwrap();
2003 assert_eq!(seq.tail_tokens(), 1);
2004 assert_eq!(seq.total_tokens(), 1);
2005 assert_eq!(seq.kv_position(), 0);
2006
2007 seq.schedule_decode(&manager).unwrap();
2009 seq.apply_decode(101, &manager).unwrap();
2010 assert_eq!(seq.tail_tokens(), 1);
2011 assert_eq!(seq.kv_position(), 1);
2012 }
2013
2014 #[test]
2015 fn test_append_tokens_exceeding_remaining_returns_error_without_mutation() {
2016 let mut seq = SchedulableSequence::<TestMeta>::new(vec![], 1, BLOCK_SIZE, noop_delegate());
2017
2018 let err = seq.append_tokens(&[100, 101]).unwrap_err();
2019
2020 assert!(matches!(
2021 err,
2022 ApplyError::AppendExceedsRemaining {
2023 requested: 2,
2024 remaining: 1,
2025 }
2026 ));
2027 assert_eq!(seq.generated_tokens(), 0);
2028 assert_eq!(seq.remaining_tokens(), 1);
2029 assert_eq!(seq.total_tokens(), 0);
2030 assert_eq!(seq.tail_tokens(), 0);
2031 assert_eq!(seq.kv_position(), 0);
2032 }
2033
2034 #[test]
2035 fn test_kv_position_through_lifecycle() {
2036 let manager = create_test_manager::<TestMeta>(20);
2037 let mut seq =
2038 SchedulableSequence::<TestMeta>::new(make_tokens(8), 20, BLOCK_SIZE, noop_delegate());
2039
2040 assert_eq!(seq.kv_position(), 0);
2041
2042 seq.schedule_prefill(4, &manager).unwrap();
2044 seq.apply_prefill(None, &manager).unwrap();
2045 assert_eq!(seq.kv_position(), 4);
2046
2047 seq.schedule_prefill(4, &manager).unwrap();
2048 seq.apply_prefill(Some(1000), &manager).unwrap();
2049 assert_eq!(seq.kv_position(), 8);
2050
2051 seq.schedule_decode(&manager).unwrap();
2053 seq.apply_decode(100, &manager).unwrap();
2054 assert_eq!(seq.kv_position(), 9);
2055
2056 seq.schedule_speculative(3, &manager).unwrap();
2058 seq.apply_speculative(&[200, 201, 202], &manager).unwrap();
2059 assert_eq!(seq.kv_position(), 12);
2060 }
2061
2062 #[test]
2063 fn test_pending_completion_staged_during_decode() {
2064 let manager = create_test_manager::<TestMeta>(20);
2065 let mut seq =
2067 SchedulableSequence::<TestMeta>::new(make_tokens(7), 10, BLOCK_SIZE, noop_delegate());
2068
2069 seq.schedule_prefill(7, &manager).unwrap();
2070 seq.apply_prefill(Some(1000), &manager).unwrap();
2071
2072 assert_eq!(seq.assigned_blocks(), 1); assert_eq!(seq.unassigned_blocks(), 1); assert_eq!(seq.kv_position(), 7);
2076 assert_eq!(seq.tail_tokens(), 1); seq.schedule_decode(&manager).unwrap();
2080 assert_eq!(
2081 seq.state(),
2082 SequenceState::DecodeScheduled {
2083 blocks_allocated: 1
2084 }
2085 ); let outcome = seq.apply_decode(100, &manager).unwrap();
2089 assert_eq!(outcome, DecodeOutcome::Continue); assert_eq!(seq.assigned_blocks(), 2); assert_eq!(seq.unassigned_blocks(), 1); assert_eq!(seq.kv_position(), 8);
2093 assert_eq!(seq.tail_tokens(), 1);
2094 }
2095
2096 #[test]
2101 fn test_builder_basic() {
2102 let seq = SchedulableSequence::<TestMeta>::builder()
2103 .tokens(make_tokens(8))
2104 .max_output_tokens(10)
2105 .block_size(BLOCK_SIZE)
2106 .build::<TestMeta>()
2107 .unwrap();
2108
2109 assert_eq!(seq.state(), SequenceState::Idle);
2110 assert_eq!(seq.num_input_tokens(), 8);
2111 assert_eq!(seq.max_output_tokens(), 10);
2112 assert_eq!(seq.block_size(), BLOCK_SIZE as usize);
2113 }
2114
2115 #[test]
2116 fn test_builder_with_delegate() {
2117 let delegate = Arc::new(CollectingDelegate::new());
2118 let seq = SchedulableSequence::<TestMeta>::builder()
2119 .tokens(make_tokens(4))
2120 .max_output_tokens(5)
2121 .block_size(BLOCK_SIZE)
2122 .delegate(delegate.clone())
2123 .build::<TestMeta>()
2124 .unwrap();
2125
2126 assert_eq!(seq.num_input_tokens(), 4);
2127
2128 let events = delegate.events();
2129 assert_eq!(events.len(), 1);
2130 assert!(matches!(events[0], SequenceEvent::Created { .. }));
2131 }
2132
2133 #[test]
2134 fn test_builder_missing_required_field() {
2135 let result = SchedulableSequence::<TestMeta>::builder()
2136 .tokens(make_tokens(4))
2137 .build::<TestMeta>();
2139
2140 assert!(result.is_err());
2141 }
2142
2143 #[test]
2144 fn test_builder_default_noop_delegate() {
2145 let manager = create_test_manager::<TestMeta>(20);
2146 let mut seq = SchedulableSequence::<TestMeta>::builder()
2147 .tokens(make_tokens(4))
2148 .max_output_tokens(10)
2149 .block_size(BLOCK_SIZE)
2150 .build::<TestMeta>()
2151 .unwrap();
2152
2153 seq.schedule_prefill(4, &manager).unwrap();
2155 seq.apply_prefill(Some(1000), &manager).unwrap();
2156 seq.schedule_decode(&manager).unwrap();
2157 seq.apply_decode(100, &manager).unwrap();
2158 seq.release().unwrap();
2159 }
2160}