Skip to main content

kvbm_logical/integrations/
scheduled.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! # Schedulable Sequence
5//!
6//! Two-phase schedule/apply layer for LLM inference on top of
7//! [`RequestSequence`](crate::RequestSequence).
8//!
9//! [`SchedulableSequence`] enforces a state-machine protocol for prefill,
10//! decode, and speculative decode operations, tracks KV position, and
11//! maintains an append-only event history for observability.
12//!
13//! ## State machine
14//!
15//! ```text
16//!          schedule_prefill        apply_prefill
17//!  Idle ──────────────────► PrefillScheduled ─────► Idle
18//!   │                                                 │
19//!   │    schedule_decode          apply_decode         │
20//!   ├──────────────────► DecodeScheduled ─────────► Idle
21//!   │                                                 │
22//!   │   schedule_speculative    apply_speculative      │
23//!   └──────────────────► SpeculativeScheduled ────► Idle
24//!                   │
25//!                   │   revert_schedule
26//!                   └──────────────────────────► Idle
27//! ```
28//!
29//! Every `schedule_*` call validates preconditions and pre-allocates blocks.
30//! Every `apply_*` call commits the operation (appends tokens, registers
31//! blocks). `revert_schedule` undoes a schedule without applying,
32//! LIFO-releasing pre-allocated blocks.
33//!
34//! ## Dangling token tracking
35//!
36//! `SchedulableSequence` tracks which tokens have had their KV computed via
37//! `kv_position`. The difference `total_tokens - kv_position` gives the
38//! **tail token count** -- tokens whose KV hasn't been computed yet.
39//!
40//! After prefill with a generated token, `tail_tokens() == 1` (the first
41//! generated token is "dangling"). After each decode or speculative step,
42//! the count remains 1 (the newest token replaces the old dangling one).
43//!
44//! `schedule_decode` and `schedule_speculative` enforce exactly 1 tail
45//! token as a precondition.
46//!
47//! ## Typical lifecycle
48//!
49//! ```ignore
50//! use kvbm_logical::SchedulableSequence;
51//!
52//! let tokens: Vec<u32> = (0..8).collect();
53//! let mut seq = SchedulableSequence::<MyMeta>::builder()
54//!     .tokens(tokens)
55//!     .max_output_tokens(10)
56//!     .block_size(4)
57//!     .delegate(my_delegate)  // optional — defaults to NoopDelegate
58//!     .build()?;
59//!
60//! // 1. Optional prefix matching
61//! let matched = seq.match_and_add_prefix(&manager)?;
62//!
63//! // 2. Prefill (single chunk)
64//! seq.schedule_prefill(8 - matched * 4, &manager)?;
65//! seq.apply_prefill(Some(first_generated_token), &manager)?;
66//! // kv_position = 8, tail_tokens = 1
67//!
68//! // 3. Decode loop
69//! while !seq.is_complete() {
70//!     seq.schedule_decode(&manager)?;
71//!     let token = model.forward(&seq);
72//!     let outcome = seq.apply_decode(token, &manager)?;
73//!     // outcome: Continue | BlockCompleted | MaxLength | BlockCompletedAndMaxLength
74//! }
75//!
76//! // 4. Release
77//! seq.release()?;
78//! ```
79//!
80//! ## Chunked prefill
81//!
82//! Split prefill across multiple chunks. Only the **final** chunk (the one
83//! that reaches `num_input_tokens`) must provide a generated token.
84//!
85//! ```ignore
86//! // Chunk 1 (non-final): no token
87//! seq.schedule_prefill(4, &manager)?;
88//! seq.apply_prefill(None, &manager)?;
89//!
90//! // Chunk 2 (final): must provide first generated token
91//! seq.schedule_prefill(4, &manager)?;
92//! seq.apply_prefill(Some(first_token), &manager)?;
93//! ```
94//!
95//! ## Speculative decode
96//!
97//! Schedule a batch of draft tokens, then accept a prefix of them.
98//! Excess pre-allocated blocks are automatically released.
99//!
100//! ```ignore
101//! seq.schedule_speculative(5, &manager)?;
102//! // Model verifies draft tokens, accepts first 3
103//! let outcome = seq.apply_speculative(&[tok1, tok2, tok3], &manager)?;
104//! // Excess blocks LIFO-dropped, tail_tokens still 1
105//! ```
106//!
107//! ## Preemption
108//!
109//! Release and later reacquire blocks. Prefix cache hits reduce
110//! re-computation cost.
111//!
112//! ```ignore
113//! seq.release()?;
114//! // ... later ...
115//! let success = seq.reacquire(&manager)?;
116//! // Reacquire does not allocate a generation block;
117//! // the next schedule_decode handles that.
118//! seq.schedule_decode(&manager)?;
119//! ```
120//!
121//! ## Error handling
122//!
123//! | Error                     | When                                              |
124//! |---------------------------|---------------------------------------------------|
125//! | `ScheduleError::NotIdle`  | `schedule_*` called while already scheduled        |
126//! | `ScheduleError::PrefillNotComplete` | Decode/speculative before prefill done  |
127//! | `ScheduleError::PrefillComplete` | `schedule_prefill` after all input processed |
128//! | `ScheduleError::PrefillOverrun` | Chunk would exceed input token count        |
129//! | `ScheduleError::AllocationFailed` | Not enough blocks in the manager          |
130//! | `ScheduleError::GenerationComplete` | Already hit `max_output_tokens`         |
131//! | `ScheduleError::WrongDanglingCount` | Tail tokens != 1 for decode/speculative |
132//! | `ApplyError::WrongState`  | `apply_*` called in wrong state                   |
133//! | `ApplyError::TokenOnNonFinalChunk` | Token provided on non-final prefill chunk |
134//! | `ApplyError::MissingTokenOnFinalChunk` | Final prefill chunk missing token    |
135//! | `ApplyError::AcceptedExceedsScheduled` | More accepted than draft tokens     |
136//! | `ApplyError::AppendExceedsRemaining` | `append_tokens` exceeds output budget |
137//!
138//! ## Event delegate
139//!
140//! Every lifecycle transition is dispatched to a caller-provided
141//! [`SequenceDelegate`] via `on_event`. Events include `Created`,
142//! `PrefillScheduled`, `PrefillApplied`, `DecodeScheduled`,
143//! `DecodeApplied`, `SpeculativeScheduled`, `SpeculativeApplied`,
144//! `ScheduleReverted`, `UnassignedDropped`, `Released`, and `Reacquired`.
145//!
146//! When no delegate is provided (via the builder or `new(None)`), a
147//! [`NoopDelegate`] is used that silently discards all events.
148
149use std::sync::Arc;
150
151use derive_builder::Builder;
152
153use crate::blocks::BlockMetadata;
154use crate::manager::BlockManager;
155
156use super::request::RequestSequence;
157use dynamo_tokens::Token;
158
159// =============================================================================
160// State types
161// =============================================================================
162
163/// Current scheduling state of a [`SchedulableSequence`].
164#[derive(Debug, Clone, Copy, PartialEq, Eq)]
165pub enum SequenceState {
166    Idle,
167    PrefillScheduled {
168        num_tokens: usize,
169        blocks_allocated: usize,
170    },
171    DecodeScheduled {
172        blocks_allocated: usize,
173    },
174    SpeculativeScheduled {
175        num_tokens: usize,
176        blocks_allocated: usize,
177    },
178}
179
180/// Outcome of an [`apply_decode`](SchedulableSequence::apply_decode) or
181/// [`apply_speculative`](SchedulableSequence::apply_speculative) call.
182#[derive(Debug, Clone, Copy, PartialEq, Eq)]
183pub enum DecodeOutcome {
184    Continue,
185    BlockCompleted,
186    MaxLength,
187    BlockCompletedAndMaxLength,
188}
189
190/// Append-only event recording a lifecycle transition.
191#[derive(Debug, Clone, PartialEq, Eq)]
192pub enum SequenceEvent {
193    Created {
194        num_input_tokens: usize,
195        max_output_tokens: usize,
196        block_size: usize,
197    },
198    PrefixMatched {
199        blocks_matched: usize,
200    },
201    PrefillScheduled {
202        num_tokens: usize,
203        blocks_allocated: usize,
204    },
205    PrefillApplied {
206        num_tokens: usize,
207        blocks_registered: usize,
208        token_emitted: bool,
209    },
210    DecodeScheduled {
211        blocks_allocated: usize,
212    },
213    DecodeApplied {
214        token: Token,
215        block_completed: bool,
216    },
217    SpeculativeScheduled {
218        num_tokens: usize,
219        blocks_allocated: usize,
220    },
221    SpeculativeApplied {
222        accepted: usize,
223        scheduled: usize,
224        blocks_released: usize,
225    },
226    ScheduleReverted {
227        blocks_released: usize,
228    },
229    UnassignedDropped {
230        count: usize,
231    },
232    Released,
233    Reacquired {
234        prefix_matched: usize,
235        success: bool,
236    },
237}
238
239// =============================================================================
240// Delegate
241// =============================================================================
242
243/// Callback interface for [`SchedulableSequence`] lifecycle events.
244///
245/// Implementations receive every [`SequenceEvent`] as it occurs, enabling
246/// real-time metrics, logging, or external state updates without coupling
247/// event storage to the sequence itself.
248pub trait SequenceDelegate: Send + Sync {
249    fn on_event(&self, event: &SequenceEvent);
250}
251
252/// No-op delegate that silently discards all events.
253///
254/// Used as the default when no delegate is provided to
255/// [`SchedulableSequenceBuilder`].
256pub struct NoopDelegate;
257
258impl SequenceDelegate for NoopDelegate {
259    fn on_event(&self, _event: &SequenceEvent) {}
260}
261
262// =============================================================================
263// Builder
264// =============================================================================
265
266#[doc(hidden)]
267#[derive(Builder)]
268#[builder(
269    name = "SchedulableSequenceBuilder",
270    pattern = "owned",
271    build_fn(private, name = "build_params", error = "anyhow::Error")
272)]
273pub struct SchedulableSequenceParams {
274    tokens: Vec<Token>,
275    max_output_tokens: usize,
276    block_size: u32,
277    #[builder(default, setter(custom))]
278    delegate: Option<Arc<dyn SequenceDelegate>>,
279}
280
281impl SchedulableSequenceBuilder {
282    pub fn delegate(mut self, delegate: Arc<dyn SequenceDelegate>) -> Self {
283        self.delegate = Some(Some(delegate));
284        self
285    }
286
287    pub fn build<T: BlockMetadata>(self) -> anyhow::Result<SchedulableSequence<T>> {
288        let params = self.build_params()?;
289        Ok(SchedulableSequence::new(
290            params.tokens,
291            params.max_output_tokens,
292            params.block_size,
293            params.delegate,
294        ))
295    }
296}
297
298// =============================================================================
299// Errors
300// =============================================================================
301
302/// Error returned by `schedule_*` methods.
303#[derive(Debug, thiserror::Error)]
304pub enum ScheduleError {
305    #[error("expected Idle state, got {state:?}")]
306    NotIdle { state: SequenceState },
307    #[error("prefill overrun: position {position} + {num_tokens} > {num_input_tokens}")]
308    PrefillOverrun {
309        position: usize,
310        num_tokens: usize,
311        num_input_tokens: usize,
312    },
313    #[error("prefill already complete")]
314    PrefillComplete,
315    #[error("prefill not yet complete (position {position} < {num_input_tokens})")]
316    PrefillNotComplete {
317        position: usize,
318        num_input_tokens: usize,
319    },
320    #[error("allocation failed: needed {needed} blocks")]
321    AllocationFailed { needed: usize },
322    #[error("generation complete: {generated} >= {max_output}")]
323    GenerationComplete { generated: usize, max_output: usize },
324    #[error("expected {expected} dangling token(s), got {actual}")]
325    WrongDanglingCount { expected: usize, actual: usize },
326}
327
328/// Error returned by `apply_*` and `revert_schedule` methods.
329#[derive(Debug, thiserror::Error)]
330pub enum ApplyError {
331    #[error("expected {expected}, got {actual:?}")]
332    WrongState {
333        expected: &'static str,
334        actual: SequenceState,
335    },
336    #[error("token provided but prefill not completing this chunk")]
337    TokenOnNonFinalChunk,
338    #[error("accepted {accepted} tokens exceeds scheduled {scheduled}")]
339    AcceptedExceedsScheduled { accepted: usize, scheduled: usize },
340    #[error("final prefill chunk requires a generated token")]
341    MissingTokenOnFinalChunk,
342    #[error("append requested {requested} tokens but only {remaining} remain")]
343    AppendExceedsRemaining { requested: usize, remaining: usize },
344}
345
346// =============================================================================
347// SchedulableSequence
348// =============================================================================
349
350/// Generates simple `&self` forwarding methods to `self.inner`.
351macro_rules! delegate_to_inner {
352    ( $( $(#[$meta:meta])* $vis:vis fn $name:ident(&self) -> $ret:ty; )* ) => {
353        $( $(#[$meta])* $vis fn $name(&self) -> $ret { self.inner.$name() } )*
354    };
355}
356
357/// Two-phase schedule/apply wrapper over [`RequestSequence`].
358///
359/// Enforces a state machine protocol:
360/// - `Idle` → `schedule_*` → `Scheduled` → `apply_*` or `revert_schedule` → `Idle`
361///
362/// Dispatches lifecycle events to a caller-provided [`SequenceDelegate`].
363pub struct SchedulableSequence<T: BlockMetadata> {
364    inner: RequestSequence<T>,
365    state: SequenceState,
366    prefill_position: usize,
367    kv_position: usize,
368    delegate: Arc<dyn SequenceDelegate>,
369}
370
371impl<T: BlockMetadata> SchedulableSequence<T> {
372    // =====================================================================
373    // Construction
374    // =====================================================================
375
376    /// Returns a builder for configuring a `SchedulableSequence`.
377    pub fn builder() -> SchedulableSequenceBuilder {
378        SchedulableSequenceBuilder::default()
379    }
380
381    /// Creates a new `SchedulableSequence` wrapping a fresh `RequestSequence`.
382    ///
383    /// If `delegate` is `None`, a [`NoopDelegate`] is used.
384    pub fn new(
385        tokens: Vec<Token>,
386        max_output_tokens: usize,
387        block_size: u32,
388        delegate: Option<Arc<dyn SequenceDelegate>>,
389    ) -> Self {
390        let inner = RequestSequence::new(tokens, max_output_tokens, block_size);
391        let delegate = delegate.unwrap_or_else(|| Arc::new(NoopDelegate));
392        delegate.on_event(&SequenceEvent::Created {
393            num_input_tokens: inner.num_input_tokens(),
394            max_output_tokens,
395            block_size: block_size as usize,
396        });
397        Self {
398            inner,
399            state: SequenceState::Idle,
400            prefill_position: 0,
401            kv_position: 0,
402            delegate,
403        }
404    }
405
406    // =====================================================================
407    // Prefix matching (Idle only)
408    // =====================================================================
409
410    /// Match and add prefix blocks from the manager's pools.
411    ///
412    /// Advances `prefill_position` by `matched_blocks * block_size`.
413    pub fn match_and_add_prefix(
414        &mut self,
415        manager: &BlockManager<T>,
416    ) -> Result<usize, ScheduleError> {
417        self.require_idle()?;
418
419        let count = self
420            .inner
421            .match_and_add_prefix(manager)
422            .unwrap_or_else(|_| panic!("prefix match should not produce duplicates"));
423
424        if count > 0 {
425            self.prefill_position += count * self.inner.block_size();
426            self.kv_position = self.prefill_position;
427        }
428
429        self.delegate.on_event(&SequenceEvent::PrefixMatched {
430            blocks_matched: count,
431        });
432
433        Ok(count)
434    }
435
436    // =====================================================================
437    // Two-phase prefill
438    // =====================================================================
439
440    /// Schedule a prefill chunk of `num_tokens` tokens.
441    ///
442    /// Allocates the blocks needed to cover `prefill_position + num_tokens`
443    /// tokens, minus already-assigned/staged/unassigned blocks.
444    pub fn schedule_prefill(
445        &mut self,
446        num_tokens: usize,
447        manager: &BlockManager<T>,
448    ) -> Result<(), ScheduleError> {
449        self.require_idle()?;
450
451        if self.is_prefill_complete() {
452            return Err(ScheduleError::PrefillComplete);
453        }
454
455        let num_input = self.inner.num_input_tokens();
456        let new_position = self.prefill_position + num_tokens;
457        if new_position > num_input {
458            return Err(ScheduleError::PrefillOverrun {
459                position: self.prefill_position,
460                num_tokens,
461                num_input_tokens: num_input,
462            });
463        }
464
465        // How many total blocks are needed to cover tokens up to new_position?
466        let bs = self.inner.block_size();
467        let total_blocks_needed = new_position.div_ceil(bs);
468
469        // How many do we already have?
470        let already_have = self.inner.assigned_blocks()
471            + self.inner.staged_blocks()
472            + self.inner.unassigned_blocks();
473
474        let to_allocate = total_blocks_needed.saturating_sub(already_have);
475
476        if to_allocate > 0 && !self.inner.allocate_blocks(to_allocate, manager) {
477            return Err(ScheduleError::AllocationFailed {
478                needed: to_allocate,
479            });
480        }
481
482        self.state = SequenceState::PrefillScheduled {
483            num_tokens,
484            blocks_allocated: to_allocate,
485        };
486
487        self.delegate.on_event(&SequenceEvent::PrefillScheduled {
488            num_tokens,
489            blocks_allocated: to_allocate,
490        });
491
492        Ok(())
493    }
494
495    /// Apply a previously scheduled prefill chunk.
496    ///
497    /// If `token` is `Some`, it is the first generation token emitted on the
498    /// final prefill chunk. Providing a token on a non-final chunk returns an
499    /// error.
500    pub fn apply_prefill(
501        &mut self,
502        token: Option<Token>,
503        manager: &BlockManager<T>,
504    ) -> Result<(), ApplyError> {
505        let (num_tokens, _blocks_allocated) = match self.state {
506            SequenceState::PrefillScheduled {
507                num_tokens,
508                blocks_allocated,
509            } => (num_tokens, blocks_allocated),
510            other => {
511                return Err(ApplyError::WrongState {
512                    expected: "PrefillScheduled",
513                    actual: other,
514                });
515            }
516        };
517
518        let new_position = self.prefill_position + num_tokens;
519        let is_final = new_position >= self.inner.num_input_tokens();
520
521        if token.is_some() && !is_final {
522            return Err(ApplyError::TokenOnNonFinalChunk);
523        }
524        if is_final && token.is_none() && self.inner.max_output_tokens() > 0 {
525            return Err(ApplyError::MissingTokenOnFinalChunk);
526        }
527
528        let blocks_registered_before = self.inner.assigned_blocks();
529
530        // Stage and register the prefill blocks
531        self.inner.complete_and_register_pending(manager);
532        self.prefill_position = new_position;
533        self.kv_position = self.prefill_position;
534
535        // If a token was provided on the final chunk, append it.
536        // The token is "dangling" — its KV hasn't been computed yet.
537        // The block it may complete is NOT registered here; it will be
538        // staged during the next apply_decode after the model forward pass.
539        let token_emitted = token.is_some();
540        if let Some(tok) = token {
541            self.inner.append_token(tok);
542        }
543
544        let blocks_registered =
545            self.inner.assigned_blocks() - blocks_registered_before + self.inner.staged_blocks();
546
547        self.state = SequenceState::Idle;
548        self.delegate.on_event(&SequenceEvent::PrefillApplied {
549            num_tokens,
550            blocks_registered,
551            token_emitted,
552        });
553
554        Ok(())
555    }
556
557    // =====================================================================
558    // Two-phase decode
559    // =====================================================================
560
561    /// Schedule a single decode step.
562    ///
563    /// Allocates blocks for both pending completions (blocks completed at
564    /// the sequence level but not yet staged, e.g. from prefill's dangling
565    /// token crossing a boundary) and the generation block.
566    pub fn schedule_decode(&mut self, manager: &BlockManager<T>) -> Result<(), ScheduleError> {
567        self.require_idle()?;
568        self.require_prefill_complete()?;
569        self.require_not_complete()?;
570        self.require_one_dangling()?;
571
572        // Pending completions: complete blocks in sequence not yet assigned/staged
573        let complete_in_seq = self.inner.complete_sequence_blocks();
574        let registered = self.inner.assigned_blocks() + self.inner.staged_blocks();
575        let pending = complete_in_seq.saturating_sub(registered);
576
577        // Need: pending (for staging after KV computed) + 1 (gen block)
578        let need = pending + 1;
579        let have = self.inner.unassigned_blocks();
580        let to_allocate = need.saturating_sub(have);
581
582        if to_allocate > 0 && !self.inner.allocate_blocks(to_allocate, manager) {
583            return Err(ScheduleError::AllocationFailed {
584                needed: to_allocate,
585            });
586        }
587
588        self.state = SequenceState::DecodeScheduled {
589            blocks_allocated: to_allocate,
590        };
591        self.delegate.on_event(&SequenceEvent::DecodeScheduled {
592            blocks_allocated: to_allocate,
593        });
594
595        Ok(())
596    }
597
598    /// Apply a previously scheduled decode step.
599    pub fn apply_decode(
600        &mut self,
601        token: Token,
602        manager: &BlockManager<T>,
603    ) -> Result<DecodeOutcome, ApplyError> {
604        let _blocks_allocated = match self.state {
605            SequenceState::DecodeScheduled { blocks_allocated } => blocks_allocated,
606            other => {
607                return Err(ApplyError::WrongState {
608                    expected: "DecodeScheduled",
609                    actual: other,
610                });
611            }
612        };
613
614        let crossed = self.inner.append_token(token);
615        let block_completed = crossed.is_some();
616
617        // Always stage pending completions — handles both:
618        // 1. Blocks completed during prefill's token append (deferred staging)
619        // 2. Block just completed by this decode token
620        self.inner.complete_and_register_pending(manager);
621
622        self.kv_position += 1;
623        self.state = SequenceState::Idle;
624        self.delegate.on_event(&SequenceEvent::DecodeApplied {
625            token,
626            block_completed,
627        });
628
629        let is_complete = self.inner.is_complete();
630        Ok(match (block_completed, is_complete) {
631            (false, false) => DecodeOutcome::Continue,
632            (true, false) => DecodeOutcome::BlockCompleted,
633            (false, true) => DecodeOutcome::MaxLength,
634            (true, true) => DecodeOutcome::BlockCompletedAndMaxLength,
635        })
636    }
637
638    // =====================================================================
639    // Speculative decode
640    // =====================================================================
641
642    /// Schedule a speculative decode of `num_draft_tokens` tokens.
643    ///
644    /// Allocates enough blocks to accommodate the draft tokens, accounting
645    /// for the current partial block state and all already-held blocks
646    /// (assigned + staged + unassigned).
647    pub fn schedule_speculative(
648        &mut self,
649        num_draft_tokens: usize,
650        manager: &BlockManager<T>,
651    ) -> Result<(), ScheduleError> {
652        self.require_idle()?;
653        self.require_prefill_complete()?;
654        self.require_not_complete()?;
655        self.require_one_dangling()?;
656
657        // Clamp to remaining output budget to prevent append_token panics.
658        let num_draft_tokens = num_draft_tokens.min(self.inner.remaining_tokens());
659
660        let bs = self.inner.block_size();
661        let future_total = self.inner.total_tokens() + num_draft_tokens;
662        let future_blocks = future_total.div_ceil(bs);
663        let have = self.inner.assigned_blocks()
664            + self.inner.staged_blocks()
665            + self.inner.unassigned_blocks();
666        let to_allocate = future_blocks.saturating_sub(have);
667
668        if to_allocate > 0 && !self.inner.allocate_blocks(to_allocate, manager) {
669            return Err(ScheduleError::AllocationFailed {
670                needed: to_allocate,
671            });
672        }
673
674        self.state = SequenceState::SpeculativeScheduled {
675            num_tokens: num_draft_tokens,
676            blocks_allocated: to_allocate,
677        };
678        self.delegate
679            .on_event(&SequenceEvent::SpeculativeScheduled {
680                num_tokens: num_draft_tokens,
681                blocks_allocated: to_allocate,
682            });
683
684        Ok(())
685    }
686
687    /// Apply a speculative decode with `accepted` tokens (a prefix of the
688    /// scheduled draft).
689    ///
690    /// Excess unassigned blocks (allocated for rejected draft tokens) are
691    /// LIFO-dropped, returning them to the pool via RAII.
692    pub fn apply_speculative(
693        &mut self,
694        accepted: &[Token],
695        manager: &BlockManager<T>,
696    ) -> Result<DecodeOutcome, ApplyError> {
697        let (scheduled_tokens, _blocks_allocated) = match self.state {
698            SequenceState::SpeculativeScheduled {
699                num_tokens,
700                blocks_allocated,
701            } => (num_tokens, blocks_allocated),
702            other => {
703                return Err(ApplyError::WrongState {
704                    expected: "SpeculativeScheduled",
705                    actual: other,
706                });
707            }
708        };
709
710        if accepted.len() > scheduled_tokens {
711            return Err(ApplyError::AcceptedExceedsScheduled {
712                accepted: accepted.len(),
713                scheduled: scheduled_tokens,
714            });
715        }
716
717        // Append accepted tokens one at a time, tracking boundary crossings
718        let mut block_completed = false;
719        for &token in accepted {
720            let crossed = self.inner.append_token(token);
721            if crossed.is_some() {
722                block_completed = true;
723            }
724        }
725
726        // Stage and register all pending completions (including any from
727        // prefill's deferred staging and blocks just completed above)
728        self.inner.complete_and_register_pending(manager);
729
730        self.kv_position += accepted.len();
731
732        // LIFO-drop excess unassigned blocks.
733        // After appending accepted tokens, the generation block (if any) is the
734        // remaining unassigned. If we over-allocated for the draft, drop excess.
735        let excess = self.lifo_drop_excess_unassigned();
736
737        self.state = SequenceState::Idle;
738        self.delegate.on_event(&SequenceEvent::SpeculativeApplied {
739            accepted: accepted.len(),
740            scheduled: scheduled_tokens,
741            blocks_released: excess,
742        });
743
744        let is_complete = self.inner.is_complete();
745        Ok(match (block_completed, is_complete) {
746            (false, false) => DecodeOutcome::Continue,
747            (true, false) => DecodeOutcome::BlockCompleted,
748            (false, true) => DecodeOutcome::MaxLength,
749            (true, true) => DecodeOutcome::BlockCompletedAndMaxLength,
750        })
751    }
752
753    // =====================================================================
754    // Revert
755    // =====================================================================
756
757    /// Revert a scheduled (but not yet applied) operation.
758    ///
759    /// LIFO-pops the `blocks_allocated` unassigned blocks that were
760    /// pre-allocated during the schedule phase. The dropped RAII guards
761    /// return blocks to the manager's pools.
762    pub fn revert_schedule(&mut self) -> Result<(), ApplyError> {
763        let blocks_to_release = match self.state {
764            SequenceState::PrefillScheduled {
765                blocks_allocated, ..
766            } => blocks_allocated,
767            SequenceState::DecodeScheduled { blocks_allocated } => blocks_allocated,
768            SequenceState::SpeculativeScheduled {
769                blocks_allocated, ..
770            } => blocks_allocated,
771            other => {
772                return Err(ApplyError::WrongState {
773                    expected: "any Scheduled state",
774                    actual: other,
775                });
776            }
777        };
778
779        self.lifo_pop_unassigned(blocks_to_release);
780
781        self.state = SequenceState::Idle;
782        self.delegate.on_event(&SequenceEvent::ScheduleReverted {
783            blocks_released: blocks_to_release,
784        });
785
786        Ok(())
787    }
788
789    // =====================================================================
790    // Explicit LIFO drop of unassigned blocks
791    // =====================================================================
792
793    /// LIFO-drop up to `count` unassigned blocks. Returns the actual number
794    /// dropped. Valid only in Idle state.
795    pub fn drop_unassigned(&mut self, count: usize) -> usize {
796        assert!(
797            self.state == SequenceState::Idle,
798            "drop_unassigned called in non-Idle state: {:?}",
799            self.state
800        );
801        let dropped = self.lifo_pop_unassigned(count);
802        if dropped > 0 {
803            self.delegate
804                .on_event(&SequenceEvent::UnassignedDropped { count: dropped });
805        }
806        dropped
807    }
808
809    // =====================================================================
810    // Lifecycle
811    // =====================================================================
812
813    /// Release all block assignments (RAII returns them to pools).
814    pub fn release(&mut self) -> Result<(), ApplyError> {
815        self.require_idle_for_apply()?;
816        self.inner.release();
817        self.delegate.on_event(&SequenceEvent::Released);
818        Ok(())
819    }
820
821    /// Re-acquire blocks from the manager after a release/preemption.
822    pub fn reacquire(&mut self, manager: &BlockManager<T>) -> Result<bool, ApplyError> {
823        self.require_idle_for_apply()?;
824        let success = self.inner.reacquire(manager);
825        let prefix_matched = self.inner.prefix_matched_blocks();
826        self.delegate.on_event(&SequenceEvent::Reacquired {
827            prefix_matched,
828            success,
829        });
830        Ok(success)
831    }
832
833    // =====================================================================
834    // Token append (no KV advancement)
835    // =====================================================================
836
837    /// Append tokens to the sequence without advancing `kv_position`.
838    /// Each appended token increases the dangling count.
839    /// Requires Idle state.
840    pub fn append_tokens(&mut self, tokens: &[Token]) -> Result<(), ApplyError> {
841        self.require_idle_for_apply()?;
842        let remaining = self.inner.remaining_tokens();
843        if tokens.len() > remaining {
844            return Err(ApplyError::AppendExceedsRemaining {
845                requested: tokens.len(),
846                remaining,
847            });
848        }
849        for &token in tokens {
850            self.inner.append_token(token);
851        }
852        Ok(())
853    }
854
855    // =====================================================================
856    // Accessors
857    // =====================================================================
858
859    /// Current scheduling state.
860    pub fn state(&self) -> SequenceState {
861        self.state
862    }
863
864    /// How many input tokens have been processed so far.
865    pub fn prefill_position(&self) -> usize {
866        self.prefill_position
867    }
868
869    /// Whether all input tokens have been processed.
870    pub fn is_prefill_complete(&self) -> bool {
871        self.prefill_position >= self.inner.num_input_tokens()
872    }
873
874    /// Number of tokens whose KV has been computed.
875    pub fn kv_position(&self) -> usize {
876        self.kv_position
877    }
878
879    /// Number of tokens whose KV hasn't been computed yet.
880    /// After prefill: 1 (the first generated token). After decode: 1 (the new token).
881    pub fn tail_tokens(&self) -> usize {
882        self.inner.total_tokens().saturating_sub(self.kv_position)
883    }
884
885    /// Reference to the delegate.
886    pub fn delegate(&self) -> &Arc<dyn SequenceDelegate> {
887        &self.delegate
888    }
889
890    // Forwarded from RequestSequence
891
892    delegate_to_inner! {
893        pub fn generated_tokens(&self) -> usize;
894        pub fn max_output_tokens(&self) -> usize;
895        pub fn num_input_tokens(&self) -> usize;
896        pub fn total_tokens(&self) -> usize;
897        pub fn remaining_tokens(&self) -> usize;
898        pub fn num_blocks(&self) -> usize;
899        pub fn assigned_blocks(&self) -> usize;
900        pub fn staged_blocks(&self) -> usize;
901        pub fn unassigned_blocks(&self) -> usize;
902        pub fn prefix_matched_blocks(&self) -> usize;
903        pub fn block_size(&self) -> usize;
904        pub fn is_complete(&self) -> bool;
905    }
906
907    /// Reference to the underlying `RequestSequence`.
908    pub fn inner(&self) -> &RequestSequence<T> {
909        &self.inner
910    }
911
912    /// Mutable reference to the underlying `RequestSequence`.
913    #[allow(dead_code)]
914    pub(crate) fn inner_mut(&mut self) -> &mut RequestSequence<T> {
915        &mut self.inner
916    }
917
918    // =====================================================================
919    // Private helpers
920    // =====================================================================
921
922    fn require_idle(&self) -> Result<(), ScheduleError> {
923        if self.state != SequenceState::Idle {
924            return Err(ScheduleError::NotIdle { state: self.state });
925        }
926        Ok(())
927    }
928
929    fn require_idle_for_apply(&self) -> Result<(), ApplyError> {
930        if self.state != SequenceState::Idle {
931            return Err(ApplyError::WrongState {
932                expected: "Idle",
933                actual: self.state,
934            });
935        }
936        Ok(())
937    }
938
939    fn require_prefill_complete(&self) -> Result<(), ScheduleError> {
940        if !self.is_prefill_complete() {
941            return Err(ScheduleError::PrefillNotComplete {
942                position: self.prefill_position,
943                num_input_tokens: self.inner.num_input_tokens(),
944            });
945        }
946        Ok(())
947    }
948
949    fn require_not_complete(&self) -> Result<(), ScheduleError> {
950        if self.inner.is_complete() {
951            return Err(ScheduleError::GenerationComplete {
952                generated: self.inner.generated_tokens(),
953                max_output: self.inner.max_output_tokens(),
954            });
955        }
956        Ok(())
957    }
958
959    fn require_one_dangling(&self) -> Result<(), ScheduleError> {
960        let dangling = self.tail_tokens();
961        if dangling != 1 {
962            return Err(ScheduleError::WrongDanglingCount {
963                expected: 1,
964                actual: dangling,
965            });
966        }
967        Ok(())
968    }
969
970    /// LIFO-pop up to `count` unassigned blocks. Returns the actual count dropped.
971    fn lifo_pop_unassigned(&mut self, count: usize) -> usize {
972        let assignments = self.inner.assignments_mut();
973        let mut dropped = 0;
974        for _ in 0..count {
975            if assignments.pop_last_unassigned().is_some() {
976                dropped += 1;
977            } else {
978                break;
979            }
980        }
981        dropped
982    }
983
984    /// After speculative apply: drop any excess unassigned blocks beyond
985    /// what's needed for the current partial block (at most 1 gen block).
986    fn lifo_drop_excess_unassigned(&mut self) -> usize {
987        let bs = self.inner.block_size();
988        let total = self.inner.total_tokens();
989        // We need at most 1 unassigned (gen) block if there's a partial block in progress
990        // AND we haven't hit max output tokens.
991        let need_gen = if self.inner.is_complete() {
992            0
993        } else if !total.is_multiple_of(bs) {
994            // Partial block in progress — already have an unassigned block covering it
995            1
996        } else {
997            // On exact boundary — the last block was just completed & registered.
998            // We still keep 1 gen block for future decode unless complete.
999            1
1000        };
1001
1002        let current = self.inner.unassigned_blocks();
1003        let excess = current.saturating_sub(need_gen);
1004        self.lifo_pop_unassigned(excess)
1005    }
1006}
1007
1008impl<T: BlockMetadata> std::fmt::Debug for SchedulableSequence<T> {
1009    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1010        f.debug_struct("SchedulableSequence")
1011            .field("state", &self.state)
1012            .field("prefill_position", &self.prefill_position)
1013            .field("kv_position", &self.kv_position)
1014            .field("inner", &self.inner)
1015            .finish()
1016    }
1017}
1018
1019#[cfg(test)]
1020mod tests {
1021    use super::*;
1022    use crate::testing::{TestMeta, create_test_manager};
1023    use std::sync::Mutex;
1024
1025    const BLOCK_SIZE: u32 = 4;
1026
1027    // =========================================================================
1028    // Test delegate
1029    // =========================================================================
1030
1031    struct CollectingDelegate {
1032        events: Mutex<Vec<SequenceEvent>>,
1033    }
1034
1035    impl CollectingDelegate {
1036        fn new() -> Self {
1037            Self {
1038                events: Mutex::new(Vec::new()),
1039            }
1040        }
1041
1042        fn events(&self) -> Vec<SequenceEvent> {
1043            self.events.lock().unwrap().clone()
1044        }
1045    }
1046
1047    impl SequenceDelegate for CollectingDelegate {
1048        fn on_event(&self, event: &SequenceEvent) {
1049            self.events.lock().unwrap().push(event.clone());
1050        }
1051    }
1052
1053    fn noop_delegate() -> Option<Arc<dyn SequenceDelegate>> {
1054        None
1055    }
1056
1057    fn make_tokens(n: usize) -> Vec<Token> {
1058        (0..n as u32).collect()
1059    }
1060
1061    // =========================================================================
1062    // Construction
1063    // =========================================================================
1064
1065    #[test]
1066    fn test_new_starts_idle() {
1067        let delegate = Arc::new(CollectingDelegate::new());
1068        let seq = SchedulableSequence::<TestMeta>::new(
1069            make_tokens(8),
1070            10,
1071            BLOCK_SIZE,
1072            Some(delegate.clone()),
1073        );
1074
1075        assert_eq!(seq.state(), SequenceState::Idle);
1076        assert_eq!(seq.prefill_position(), 0);
1077        assert_eq!(seq.kv_position(), 0);
1078        assert_eq!(seq.tail_tokens(), 8);
1079        assert_eq!(seq.num_input_tokens(), 8);
1080        assert_eq!(seq.max_output_tokens(), 10);
1081        assert_eq!(seq.block_size(), BLOCK_SIZE as usize);
1082        assert!(!seq.is_prefill_complete());
1083
1084        let events = delegate.events();
1085        assert_eq!(events.len(), 1);
1086        assert_eq!(
1087            events[0],
1088            SequenceEvent::Created {
1089                num_input_tokens: 8,
1090                max_output_tokens: 10,
1091                block_size: BLOCK_SIZE as usize,
1092            }
1093        );
1094    }
1095
1096    // =========================================================================
1097    // State machine enforcement
1098    // =========================================================================
1099
1100    #[test]
1101    fn test_schedule_prefill_requires_idle() {
1102        let manager = create_test_manager::<TestMeta>(20);
1103        let mut seq =
1104            SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
1105
1106        seq.schedule_prefill(4, &manager).unwrap();
1107        let err = seq.schedule_prefill(4, &manager).unwrap_err();
1108        assert!(matches!(err, ScheduleError::NotIdle { .. }));
1109    }
1110
1111    #[test]
1112    fn test_schedule_decode_requires_idle() {
1113        let manager = create_test_manager::<TestMeta>(20);
1114        let mut seq =
1115            SchedulableSequence::<TestMeta>::new(make_tokens(4), 10, BLOCK_SIZE, noop_delegate());
1116
1117        // Prefill first
1118        seq.schedule_prefill(4, &manager).unwrap();
1119        seq.apply_prefill(Some(1000), &manager).unwrap();
1120
1121        // Schedule decode
1122        seq.schedule_decode(&manager).unwrap();
1123        let err = seq.schedule_decode(&manager).unwrap_err();
1124        assert!(matches!(err, ScheduleError::NotIdle { .. }));
1125    }
1126
1127    #[test]
1128    fn test_apply_prefill_requires_scheduled() {
1129        let manager = create_test_manager::<TestMeta>(20);
1130        let mut seq =
1131            SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
1132
1133        let err = seq.apply_prefill(None, &manager).unwrap_err();
1134        assert!(matches!(err, ApplyError::WrongState { .. }));
1135    }
1136
1137    #[test]
1138    fn test_apply_decode_requires_scheduled() {
1139        let manager = create_test_manager::<TestMeta>(20);
1140        let mut seq =
1141            SchedulableSequence::<TestMeta>::new(make_tokens(4), 10, BLOCK_SIZE, noop_delegate());
1142
1143        let err = seq.apply_decode(100, &manager).unwrap_err();
1144        assert!(matches!(err, ApplyError::WrongState { .. }));
1145    }
1146
1147    #[test]
1148    fn test_decode_requires_prefill_complete() {
1149        let manager = create_test_manager::<TestMeta>(20);
1150        let mut seq =
1151            SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
1152
1153        let err = seq.schedule_decode(&manager).unwrap_err();
1154        assert!(matches!(err, ScheduleError::PrefillNotComplete { .. }));
1155    }
1156
1157    #[test]
1158    fn test_speculative_requires_prefill_complete() {
1159        let manager = create_test_manager::<TestMeta>(20);
1160        let mut seq =
1161            SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
1162
1163        let err = seq.schedule_speculative(3, &manager).unwrap_err();
1164        assert!(matches!(err, ScheduleError::PrefillNotComplete { .. }));
1165    }
1166
1167    // =========================================================================
1168    // Prefill
1169    // =========================================================================
1170
1171    #[test]
1172    fn test_prefill_single_chunk() {
1173        let manager = create_test_manager::<TestMeta>(20);
1174        let mut seq =
1175            SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
1176
1177        // Schedule and apply the full prefill (no gen block allocated)
1178        seq.schedule_prefill(8, &manager).unwrap();
1179        assert_eq!(
1180            seq.state(),
1181            SequenceState::PrefillScheduled {
1182                num_tokens: 8,
1183                blocks_allocated: 2, // 2 input blocks only
1184            }
1185        );
1186
1187        seq.apply_prefill(Some(1000), &manager).unwrap();
1188        assert_eq!(seq.state(), SequenceState::Idle);
1189        assert_eq!(seq.prefill_position(), 8);
1190        assert_eq!(seq.kv_position(), 8);
1191        assert!(seq.is_prefill_complete());
1192        assert_eq!(seq.assigned_blocks(), 2);
1193        assert_eq!(seq.unassigned_blocks(), 0); // no gen block
1194        assert_eq!(seq.tail_tokens(), 1); // token 1000
1195    }
1196
1197    #[test]
1198    fn test_prefill_chunked() {
1199        let manager = create_test_manager::<TestMeta>(20);
1200        let mut seq =
1201            SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
1202
1203        // Chunk 1: first 4 tokens (1 block, non-final)
1204        seq.schedule_prefill(4, &manager).unwrap();
1205        seq.apply_prefill(None, &manager).unwrap();
1206        assert_eq!(seq.prefill_position(), 4);
1207        assert_eq!(seq.kv_position(), 4);
1208        assert!(!seq.is_prefill_complete());
1209        assert_eq!(seq.assigned_blocks(), 1);
1210
1211        // Chunk 2: next 4 tokens (1 block, final — must provide token)
1212        seq.schedule_prefill(4, &manager).unwrap();
1213        seq.apply_prefill(Some(1000), &manager).unwrap();
1214        assert_eq!(seq.prefill_position(), 8);
1215        assert_eq!(seq.kv_position(), 8);
1216        assert!(seq.is_prefill_complete());
1217        assert_eq!(seq.assigned_blocks(), 2);
1218        assert_eq!(seq.unassigned_blocks(), 0); // no gen block
1219        assert_eq!(seq.tail_tokens(), 1);
1220    }
1221
1222    #[test]
1223    fn test_prefill_final_with_first_token() {
1224        let manager = create_test_manager::<TestMeta>(20);
1225        let mut seq =
1226            SchedulableSequence::<TestMeta>::new(make_tokens(4), 10, BLOCK_SIZE, noop_delegate());
1227
1228        seq.schedule_prefill(4, &manager).unwrap();
1229        seq.apply_prefill(Some(100), &manager).unwrap();
1230
1231        assert!(seq.is_prefill_complete());
1232        assert_eq!(seq.generated_tokens(), 1);
1233        assert_eq!(seq.total_tokens(), 5);
1234        assert_eq!(seq.kv_position(), 4);
1235        assert_eq!(seq.tail_tokens(), 1);
1236    }
1237
1238    #[test]
1239    fn test_prefill_token_on_non_final_rejected() {
1240        let manager = create_test_manager::<TestMeta>(20);
1241        let mut seq =
1242            SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
1243
1244        seq.schedule_prefill(4, &manager).unwrap();
1245        let err = seq.apply_prefill(Some(100), &manager).unwrap_err();
1246        assert!(matches!(err, ApplyError::TokenOnNonFinalChunk));
1247    }
1248
1249    #[test]
1250    fn test_prefill_overrun_rejected() {
1251        let manager = create_test_manager::<TestMeta>(20);
1252        let mut seq =
1253            SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
1254
1255        let err = seq.schedule_prefill(9, &manager).unwrap_err();
1256        assert!(matches!(err, ScheduleError::PrefillOverrun { .. }));
1257    }
1258
1259    #[test]
1260    fn test_prefill_allocation_failure() {
1261        let manager = create_test_manager::<TestMeta>(1); // only 1 block
1262        let mut seq =
1263            SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
1264
1265        let err = seq.schedule_prefill(8, &manager).unwrap_err();
1266        assert!(matches!(err, ScheduleError::AllocationFailed { .. }));
1267        // State remains idle
1268        assert_eq!(seq.state(), SequenceState::Idle);
1269    }
1270
1271    #[test]
1272    fn test_schedule_prefill_after_complete_rejected() {
1273        let manager = create_test_manager::<TestMeta>(20);
1274        let mut seq =
1275            SchedulableSequence::<TestMeta>::new(make_tokens(4), 10, BLOCK_SIZE, noop_delegate());
1276
1277        seq.schedule_prefill(4, &manager).unwrap();
1278        seq.apply_prefill(Some(1000), &manager).unwrap();
1279
1280        let err = seq.schedule_prefill(1, &manager).unwrap_err();
1281        assert!(matches!(err, ScheduleError::PrefillComplete));
1282    }
1283
1284    #[test]
1285    fn test_apply_prefill_none_on_final_rejected() {
1286        let manager = create_test_manager::<TestMeta>(20);
1287        let mut seq =
1288            SchedulableSequence::<TestMeta>::new(make_tokens(4), 10, BLOCK_SIZE, noop_delegate());
1289
1290        seq.schedule_prefill(4, &manager).unwrap();
1291        let err = seq.apply_prefill(None, &manager).unwrap_err();
1292        assert!(matches!(err, ApplyError::MissingTokenOnFinalChunk));
1293    }
1294
1295    // =========================================================================
1296    // Decode
1297    // =========================================================================
1298
1299    /// Helper: create a sequence with prefill done and first generated token.
1300    ///
1301    /// After this: `total_tokens = num_input + 1`, `generated_tokens = 1`,
1302    /// `kv_position = num_input`, `tail_tokens = 1`, `unassigned_blocks = 0`.
1303    fn prefilled_seq(
1304        num_input: usize,
1305        max_output: usize,
1306        manager: &BlockManager<TestMeta>,
1307    ) -> SchedulableSequence<TestMeta> {
1308        let mut seq = SchedulableSequence::new(
1309            make_tokens(num_input),
1310            max_output,
1311            BLOCK_SIZE,
1312            noop_delegate(),
1313        );
1314        if num_input > 0 {
1315            seq.schedule_prefill(num_input, manager).unwrap();
1316            seq.apply_prefill(Some(1000), manager).unwrap();
1317        }
1318        seq
1319    }
1320
1321    #[test]
1322    fn test_decode_continue() {
1323        let manager = create_test_manager::<TestMeta>(20);
1324        let mut seq = prefilled_seq(5, 10, &manager);
1325        // After prefill: total=6, gen=1, kv=5, assigned=1, unassigned=0
1326
1327        seq.schedule_decode(&manager).unwrap();
1328        let outcome = seq.apply_decode(100, &manager).unwrap();
1329        assert_eq!(outcome, DecodeOutcome::Continue);
1330        assert_eq!(seq.generated_tokens(), 2); // 1 from prefill + 1 decode
1331        assert_eq!(seq.state(), SequenceState::Idle);
1332    }
1333
1334    #[test]
1335    fn test_decode_block_completed() {
1336        let manager = create_test_manager::<TestMeta>(20);
1337        let mut seq = prefilled_seq(4, 10, &manager);
1338        // After prefill: total=5, kv=4, assigned=1, unassigned=0
1339
1340        // 3 decodes to reach block boundary at total=8
1341        for _ in 0..2 {
1342            seq.schedule_decode(&manager).unwrap();
1343            let outcome = seq.apply_decode(100, &manager).unwrap();
1344            assert_eq!(outcome, DecodeOutcome::Continue);
1345        }
1346        seq.schedule_decode(&manager).unwrap();
1347        let outcome = seq.apply_decode(100, &manager).unwrap();
1348        assert_eq!(outcome, DecodeOutcome::BlockCompleted);
1349        assert_eq!(seq.assigned_blocks(), 2);
1350    }
1351
1352    #[test]
1353    fn test_decode_max_length() {
1354        let manager = create_test_manager::<TestMeta>(20);
1355        let mut seq = prefilled_seq(5, 2, &manager);
1356        // After prefill: gen=1, max=2. One more decode reaches max.
1357
1358        seq.schedule_decode(&manager).unwrap();
1359        let outcome = seq.apply_decode(100, &manager).unwrap();
1360        assert_eq!(outcome, DecodeOutcome::MaxLength);
1361        assert!(seq.is_complete());
1362    }
1363
1364    #[test]
1365    fn test_decode_block_and_max() {
1366        let manager = create_test_manager::<TestMeta>(20);
1367        let mut seq = prefilled_seq(4, 4, &manager);
1368        // After prefill: total=5, gen=1, max=4. Need 3 more decodes.
1369        // At decode 3: total=8 (boundary) AND gen=4=max.
1370
1371        for _ in 0..2 {
1372            seq.schedule_decode(&manager).unwrap();
1373            seq.apply_decode(100, &manager).unwrap();
1374        }
1375        seq.schedule_decode(&manager).unwrap();
1376        let outcome = seq.apply_decode(100, &manager).unwrap();
1377        assert_eq!(outcome, DecodeOutcome::BlockCompletedAndMaxLength);
1378    }
1379
1380    #[test]
1381    fn test_decode_allocates_gen_block() {
1382        let manager = create_test_manager::<TestMeta>(20);
1383        let mut seq = prefilled_seq(4, 10, &manager);
1384        // After prefill: 1 assigned, 0 unassigned
1385
1386        // First schedule_decode always allocates 1 (gen block)
1387        seq.schedule_decode(&manager).unwrap();
1388        assert_eq!(
1389            seq.state(),
1390            SequenceState::DecodeScheduled {
1391                blocks_allocated: 1
1392            }
1393        );
1394        assert_eq!(seq.unassigned_blocks(), 1);
1395
1396        // Decode until block boundary: 3 decodes total reach total=8
1397        seq.apply_decode(100, &manager).unwrap(); // total=6
1398        seq.schedule_decode(&manager).unwrap();
1399        seq.apply_decode(101, &manager).unwrap(); // total=7
1400        seq.schedule_decode(&manager).unwrap();
1401        let outcome = seq.apply_decode(102, &manager).unwrap(); // total=8, crosses boundary
1402        assert_eq!(outcome, DecodeOutcome::BlockCompleted);
1403        assert_eq!(seq.unassigned_blocks(), 0);
1404
1405        // Next schedule_decode should allocate 1 gen block
1406        seq.schedule_decode(&manager).unwrap();
1407        assert_eq!(
1408            seq.state(),
1409            SequenceState::DecodeScheduled {
1410                blocks_allocated: 1
1411            }
1412        );
1413        assert_eq!(seq.unassigned_blocks(), 1);
1414    }
1415
1416    #[test]
1417    fn test_decode_generation_complete_rejected() {
1418        let manager = create_test_manager::<TestMeta>(20);
1419        let mut seq = prefilled_seq(5, 2, &manager);
1420        // After prefill: gen=1, max=2
1421
1422        seq.schedule_decode(&manager).unwrap();
1423        seq.apply_decode(100, &manager).unwrap();
1424        assert!(seq.is_complete()); // gen=2=max
1425
1426        let err = seq.schedule_decode(&manager).unwrap_err();
1427        assert!(matches!(err, ScheduleError::GenerationComplete { .. }));
1428    }
1429
1430    // =========================================================================
1431    // Speculative decode
1432    // =========================================================================
1433
1434    #[test]
1435    fn test_speculative_basic() {
1436        let manager = create_test_manager::<TestMeta>(20);
1437        let mut seq = prefilled_seq(8, 10, &manager);
1438        // After prefill: total=9, gen=1, kv=8, assigned=2, unassigned=0
1439
1440        // Schedule 2 draft tokens (stay within block)
1441        seq.schedule_speculative(2, &manager).unwrap();
1442        assert!(matches!(
1443            seq.state(),
1444            SequenceState::SpeculativeScheduled { num_tokens: 2, .. }
1445        ));
1446
1447        // Accept both
1448        let outcome = seq.apply_speculative(&[100, 101], &manager).unwrap();
1449        assert_eq!(outcome, DecodeOutcome::Continue);
1450        assert_eq!(seq.generated_tokens(), 3); // 1 from prefill + 2 speculative
1451        assert_eq!(seq.state(), SequenceState::Idle);
1452    }
1453
1454    #[test]
1455    fn test_speculative_partial_accept() {
1456        let manager = create_test_manager::<TestMeta>(20);
1457        let delegate = Arc::new(CollectingDelegate::new());
1458        let mut seq =
1459            SchedulableSequence::new(make_tokens(4), 10, BLOCK_SIZE, Some(delegate.clone()));
1460        seq.schedule_prefill(4, &manager).unwrap();
1461        seq.apply_prefill(Some(1000), &manager).unwrap();
1462        // After prefill: total=5, assigned=1, unassigned=0
1463
1464        let avail_before = manager.available_blocks();
1465
1466        // Schedule 4 draft tokens
1467        seq.schedule_speculative(4, &manager).unwrap();
1468
1469        // Accept only 2 → excess blocks should be released
1470        let outcome = seq.apply_speculative(&[100, 101], &manager).unwrap();
1471        assert_eq!(outcome, DecodeOutcome::Continue);
1472        assert_eq!(seq.generated_tokens(), 3); // 1 from prefill + 2
1473        assert_eq!(seq.unassigned_blocks(), 1); // keep 1 gen block
1474
1475        // Check delegate records the release
1476        let events = delegate.events();
1477        let last = events.last().unwrap();
1478        if let SequenceEvent::SpeculativeApplied {
1479            accepted,
1480            scheduled,
1481            ..
1482        } = last
1483        {
1484            assert_eq!(*accepted, 2);
1485            assert_eq!(*scheduled, 4);
1486        } else {
1487            panic!("expected SpeculativeApplied");
1488        }
1489
1490        let _ = avail_before;
1491    }
1492
1493    #[test]
1494    fn test_speculative_single_accept() {
1495        let manager = create_test_manager::<TestMeta>(20);
1496        let mut seq = prefilled_seq(4, 10, &manager);
1497
1498        seq.schedule_speculative(5, &manager).unwrap();
1499        let outcome = seq.apply_speculative(&[100], &manager).unwrap();
1500        assert_eq!(outcome, DecodeOutcome::Continue);
1501        assert_eq!(seq.generated_tokens(), 2); // 1 from prefill + 1
1502    }
1503
1504    #[test]
1505    fn test_speculative_zero_accept() {
1506        let manager = create_test_manager::<TestMeta>(20);
1507        let mut seq = prefilled_seq(4, 10, &manager);
1508
1509        let avail_before = manager.available_blocks();
1510        seq.schedule_speculative(3, &manager).unwrap();
1511        let avail_after_schedule = manager.available_blocks();
1512
1513        let outcome = seq.apply_speculative(&[], &manager).unwrap();
1514        assert_eq!(outcome, DecodeOutcome::Continue);
1515        assert_eq!(seq.generated_tokens(), 1); // 1 from prefill, 0 speculative
1516
1517        assert_eq!(seq.unassigned_blocks(), 1); // keep 1 gen block
1518        assert!(manager.available_blocks() >= avail_after_schedule);
1519        let _ = avail_before;
1520    }
1521
1522    #[test]
1523    fn test_speculative_exceeds_scheduled_rejected() {
1524        let manager = create_test_manager::<TestMeta>(20);
1525        let mut seq = prefilled_seq(4, 10, &manager);
1526
1527        seq.schedule_speculative(2, &manager).unwrap();
1528        let err = seq
1529            .apply_speculative(&[100, 101, 102], &manager)
1530            .unwrap_err();
1531        assert!(matches!(
1532            err,
1533            ApplyError::AcceptedExceedsScheduled {
1534                accepted: 3,
1535                scheduled: 2,
1536            }
1537        ));
1538    }
1539
1540    #[test]
1541    fn test_speculative_block_boundaries() {
1542        let manager = create_test_manager::<TestMeta>(20);
1543        let mut seq = prefilled_seq(7, 20, &manager);
1544        // After prefill: total=8, kv=7, assigned=1, unassigned=0
1545        // Block 1 complete at sequence level (4-6 + token 1000) but NOT registered
1546
1547        // 5 speculative tokens: registers pending block 1, crosses another boundary
1548        seq.schedule_speculative(5, &manager).unwrap();
1549        let outcome = seq
1550            .apply_speculative(&[100, 101, 102, 103, 104], &manager)
1551            .unwrap();
1552        assert_eq!(outcome, DecodeOutcome::BlockCompleted);
1553        assert_eq!(seq.generated_tokens(), 6); // 1 from prefill + 5
1554        assert_eq!(seq.assigned_blocks(), 3); // block 0 + blocks 1,2 registered
1555    }
1556
1557    // =========================================================================
1558    // Revert
1559    // =========================================================================
1560
1561    #[test]
1562    fn test_revert_prefill() {
1563        let manager = create_test_manager::<TestMeta>(20);
1564        let mut seq =
1565            SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
1566
1567        let avail_before = manager.available_blocks();
1568        seq.schedule_prefill(4, &manager).unwrap();
1569        assert!(manager.available_blocks() < avail_before);
1570
1571        seq.revert_schedule().unwrap();
1572        assert_eq!(seq.state(), SequenceState::Idle);
1573        assert_eq!(manager.available_blocks(), avail_before);
1574    }
1575
1576    #[test]
1577    fn test_revert_decode() {
1578        let manager = create_test_manager::<TestMeta>(20);
1579        let mut seq = prefilled_seq(4, 10, &manager);
1580        // After prefill: total=5, assigned=1, unassigned=0
1581
1582        // Decode until boundary at total=8 → unassigned drops to 0
1583        for _ in 0..3 {
1584            seq.schedule_decode(&manager).unwrap();
1585            seq.apply_decode(100, &manager).unwrap();
1586        }
1587        assert_eq!(seq.unassigned_blocks(), 0);
1588
1589        let avail_before = manager.available_blocks();
1590        seq.schedule_decode(&manager).unwrap();
1591        assert_eq!(manager.available_blocks(), avail_before - 1);
1592
1593        seq.revert_schedule().unwrap();
1594        assert_eq!(seq.state(), SequenceState::Idle);
1595        assert_eq!(manager.available_blocks(), avail_before);
1596    }
1597
1598    #[test]
1599    fn test_revert_speculative() {
1600        let manager = create_test_manager::<TestMeta>(20);
1601        let mut seq = prefilled_seq(4, 10, &manager);
1602
1603        let avail_before = manager.available_blocks();
1604        seq.schedule_speculative(4, &manager).unwrap();
1605        let allocated = avail_before - manager.available_blocks();
1606
1607        seq.revert_schedule().unwrap();
1608        assert_eq!(seq.state(), SequenceState::Idle);
1609        // Blocks allocated during schedule should be returned
1610        assert_eq!(manager.available_blocks(), avail_before);
1611        assert!(allocated > 0 || seq.unassigned_blocks() > 0);
1612    }
1613
1614    #[test]
1615    fn test_revert_returns_blocks_to_manager() {
1616        let manager = create_test_manager::<TestMeta>(20);
1617        let mut seq =
1618            SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
1619
1620        let avail_before = manager.available_blocks();
1621        seq.schedule_prefill(8, &manager).unwrap();
1622        let avail_scheduled = manager.available_blocks();
1623        assert!(avail_scheduled < avail_before);
1624
1625        seq.revert_schedule().unwrap();
1626        assert_eq!(manager.available_blocks(), avail_before);
1627    }
1628
1629    // =========================================================================
1630    // Drop unassigned
1631    // =========================================================================
1632
1633    #[test]
1634    fn test_drop_unassigned_lifo() {
1635        let manager = create_test_manager::<TestMeta>(20);
1636        let mut seq = prefilled_seq(4, 10, &manager);
1637        // After prefill: 1 assigned, 0 unassigned
1638
1639        // Do one decode to get 1 unassigned (gen block)
1640        seq.schedule_decode(&manager).unwrap();
1641        seq.apply_decode(100, &manager).unwrap();
1642        assert_eq!(seq.unassigned_blocks(), 1);
1643
1644        let dropped = seq.drop_unassigned(1);
1645        assert_eq!(dropped, 1);
1646        assert_eq!(seq.unassigned_blocks(), 0);
1647    }
1648
1649    #[test]
1650    fn test_drop_unassigned_partial() {
1651        let manager = create_test_manager::<TestMeta>(20);
1652        let mut seq = prefilled_seq(8, 10, &manager);
1653        // After prefill: 2 assigned, 0 unassigned
1654
1655        // Do one decode to get 1 unassigned
1656        seq.schedule_decode(&manager).unwrap();
1657        seq.apply_decode(100, &manager).unwrap();
1658        assert_eq!(seq.unassigned_blocks(), 1);
1659
1660        // Try to drop 5, but only 1 available
1661        let dropped = seq.drop_unassigned(5);
1662        assert_eq!(dropped, 1);
1663        assert_eq!(seq.unassigned_blocks(), 0);
1664    }
1665
1666    #[test]
1667    fn test_drop_unassigned_zero() {
1668        let manager = create_test_manager::<TestMeta>(20);
1669        let mut seq = prefilled_seq(4, 10, &manager);
1670
1671        // Do one decode to get 1 unassigned
1672        seq.schedule_decode(&manager).unwrap();
1673        seq.apply_decode(100, &manager).unwrap();
1674
1675        let dropped = seq.drop_unassigned(0);
1676        assert_eq!(dropped, 0);
1677        assert_eq!(seq.unassigned_blocks(), 1); // unchanged
1678    }
1679
1680    // =========================================================================
1681    // Delegate event collection
1682    // =========================================================================
1683
1684    #[test]
1685    fn test_delegate_full_lifecycle() {
1686        let manager = create_test_manager::<TestMeta>(20);
1687        let delegate = Arc::new(CollectingDelegate::new());
1688        // max_output=3: 1 from prefill + 2 decodes
1689        let mut seq = SchedulableSequence::<TestMeta>::new(
1690            make_tokens(4),
1691            3,
1692            BLOCK_SIZE,
1693            Some(delegate.clone()),
1694        );
1695
1696        // Prefill
1697        seq.schedule_prefill(4, &manager).unwrap();
1698        seq.apply_prefill(Some(1000), &manager).unwrap();
1699
1700        // Decode token 1
1701        seq.schedule_decode(&manager).unwrap();
1702        seq.apply_decode(100, &manager).unwrap();
1703
1704        // Decode token 2
1705        seq.schedule_decode(&manager).unwrap();
1706        seq.apply_decode(101, &manager).unwrap();
1707
1708        // Release
1709        seq.release().unwrap();
1710
1711        let h = delegate.events();
1712        assert_eq!(h.len(), 8);
1713
1714        assert!(matches!(h[0], SequenceEvent::Created { .. }));
1715        assert!(matches!(h[1], SequenceEvent::PrefillScheduled { .. }));
1716        assert!(matches!(h[2], SequenceEvent::PrefillApplied { .. }));
1717        assert!(matches!(h[3], SequenceEvent::DecodeScheduled { .. }));
1718        assert!(matches!(h[4], SequenceEvent::DecodeApplied { .. }));
1719        assert!(matches!(h[5], SequenceEvent::DecodeScheduled { .. }));
1720        assert!(matches!(h[6], SequenceEvent::DecodeApplied { .. }));
1721        assert!(matches!(h[7], SequenceEvent::Released));
1722    }
1723
1724    // =========================================================================
1725    // Integration: full lifecycle
1726    // =========================================================================
1727
1728    #[test]
1729    fn test_full_lifecycle_prefill_decode_release() {
1730        let manager = create_test_manager::<TestMeta>(20);
1731        // max_output=7: 1 from prefill + 6 decodes
1732        let mut seq =
1733            SchedulableSequence::<TestMeta>::new(make_tokens(6), 7, BLOCK_SIZE, noop_delegate());
1734
1735        // Prefill 6 tokens → 1 complete block + 2 partial
1736        seq.schedule_prefill(6, &manager).unwrap();
1737        seq.apply_prefill(Some(1000), &manager).unwrap();
1738
1739        assert!(seq.is_prefill_complete());
1740        assert_eq!(seq.assigned_blocks(), 1);
1741        assert_eq!(seq.unassigned_blocks(), 1); // partial-tail block from div_ceil
1742
1743        // Decode 6 tokens
1744        for i in 0..6u32 {
1745            seq.schedule_decode(&manager).unwrap();
1746            let outcome = seq.apply_decode(100 + i, &manager).unwrap();
1747            if i < 5 {
1748                match outcome {
1749                    DecodeOutcome::Continue | DecodeOutcome::BlockCompleted => {}
1750                    other => panic!("unexpected outcome at token {i}: {other:?}"),
1751                }
1752            } else {
1753                // Last token (gen=7=max)
1754                assert!(
1755                    outcome == DecodeOutcome::MaxLength
1756                        || outcome == DecodeOutcome::BlockCompletedAndMaxLength,
1757                    "last token should hit max length, got: {outcome:?}"
1758                );
1759            }
1760        }
1761
1762        assert!(seq.is_complete());
1763        assert_eq!(seq.generated_tokens(), 7);
1764        assert_eq!(seq.total_tokens(), 13);
1765
1766        seq.release().unwrap();
1767        assert_eq!(seq.assigned_blocks(), 0);
1768    }
1769
1770    #[test]
1771    fn test_preempt_and_reacquire() {
1772        let manager = create_test_manager::<TestMeta>(20);
1773        let mut seq =
1774            SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
1775
1776        // Prefill
1777        seq.schedule_prefill(8, &manager).unwrap();
1778        seq.apply_prefill(Some(1000), &manager).unwrap();
1779
1780        // Decode 2 tokens
1781        for _ in 0..2 {
1782            seq.schedule_decode(&manager).unwrap();
1783            seq.apply_decode(100, &manager).unwrap();
1784        }
1785        assert_eq!(seq.generated_tokens(), 3); // 1 from prefill + 2
1786
1787        // Preempt
1788        seq.release().unwrap();
1789        assert_eq!(seq.assigned_blocks(), 0);
1790
1791        // Reacquire (no gen block — allocated later by schedule_decode)
1792        let success = seq.reacquire(&manager).unwrap();
1793        assert!(success);
1794        assert_eq!(seq.assigned_blocks(), 2);
1795        assert_eq!(seq.unassigned_blocks(), 0); // no gen block from reacquire
1796        assert_eq!(seq.generated_tokens(), 3);
1797
1798        // Continue decoding
1799        seq.schedule_decode(&manager).unwrap();
1800        let outcome = seq.apply_decode(200, &manager).unwrap();
1801        assert_eq!(seq.generated_tokens(), 4);
1802        let _ = outcome;
1803    }
1804
1805    // =========================================================================
1806    // Prefix matching
1807    // =========================================================================
1808
1809    #[test]
1810    fn test_match_and_add_prefix() {
1811        let manager = create_test_manager::<TestMeta>(20);
1812        let tokens = make_tokens(8);
1813
1814        // Populate cache
1815        let seq_for_populate = crate::BlockSequence::new(tokens[..4].to_vec(), BLOCK_SIZE, None);
1816        let mutables = manager.allocate_blocks(1).unwrap();
1817        let registered: Vec<_> = mutables
1818            .into_iter()
1819            .zip(seq_for_populate.blocks().iter())
1820            .map(|(m, tb)| manager.register_block(m.complete(tb).unwrap()))
1821            .collect();
1822        drop(registered);
1823
1824        let mut seq = SchedulableSequence::<TestMeta>::new(tokens, 10, BLOCK_SIZE, noop_delegate());
1825        let matched = seq.match_and_add_prefix(&manager).unwrap();
1826        assert_eq!(matched, 1);
1827        assert_eq!(seq.prefill_position(), 4); // 1 block * 4 tokens
1828        assert_eq!(seq.kv_position(), 4); // cache hits have KV computed
1829        assert_eq!(seq.assigned_blocks(), 1);
1830
1831        // Should only need 1 more input block (no gen block from prefill)
1832        seq.schedule_prefill(4, &manager).unwrap();
1833        seq.apply_prefill(Some(1000), &manager).unwrap();
1834        assert_eq!(seq.assigned_blocks(), 2);
1835        assert_eq!(seq.unassigned_blocks(), 0); // no gen block
1836        assert_eq!(seq.tail_tokens(), 1);
1837    }
1838
1839    #[test]
1840    fn test_match_and_add_prefix_no_hits() {
1841        let manager = create_test_manager::<TestMeta>(20);
1842        let mut seq =
1843            SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
1844
1845        let matched = seq.match_and_add_prefix(&manager).unwrap();
1846        assert_eq!(matched, 0);
1847        assert_eq!(seq.prefill_position(), 0);
1848    }
1849
1850    // =========================================================================
1851    // Edge cases
1852    // =========================================================================
1853
1854    #[test]
1855    fn test_empty_tokens_prefill() {
1856        let manager = create_test_manager::<TestMeta>(20);
1857        let mut seq = SchedulableSequence::<TestMeta>::new(vec![], 10, BLOCK_SIZE, noop_delegate());
1858
1859        assert!(seq.is_prefill_complete());
1860
1861        // Cannot schedule prefill (already complete)
1862        let err = seq.schedule_prefill(0, &manager).unwrap_err();
1863        assert!(matches!(err, ScheduleError::PrefillComplete));
1864
1865        // Cannot schedule decode with 0 dangling tokens
1866        let err = seq.schedule_decode(&manager).unwrap_err();
1867        assert!(matches!(err, ScheduleError::WrongDanglingCount { .. }));
1868
1869        // Append initial token to create dangling, then decode
1870        seq.append_tokens(&[100]).unwrap();
1871        assert_eq!(seq.tail_tokens(), 1);
1872
1873        seq.schedule_decode(&manager).unwrap();
1874        let outcome = seq.apply_decode(101, &manager).unwrap();
1875        assert_eq!(outcome, DecodeOutcome::Continue);
1876    }
1877
1878    #[test]
1879    fn test_zero_max_output_no_gen_block() {
1880        let manager = create_test_manager::<TestMeta>(20);
1881        let mut seq =
1882            SchedulableSequence::<TestMeta>::new(make_tokens(4), 0, BLOCK_SIZE, noop_delegate());
1883
1884        seq.schedule_prefill(4, &manager).unwrap();
1885        seq.apply_prefill(None, &manager).unwrap();
1886
1887        assert_eq!(seq.assigned_blocks(), 1);
1888        assert_eq!(seq.unassigned_blocks(), 0); // no gen block
1889
1890        // Can't decode since is_complete
1891        let err = seq.schedule_decode(&manager).unwrap_err();
1892        assert!(matches!(err, ScheduleError::GenerationComplete { .. }));
1893    }
1894
1895    #[test]
1896    fn test_debug_impl() {
1897        let seq =
1898            SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
1899        let debug_str = format!("{seq:?}");
1900        assert!(debug_str.contains("SchedulableSequence"));
1901        assert!(debug_str.contains("Idle"));
1902    }
1903
1904    #[test]
1905    fn test_revert_idle_rejected() {
1906        let mut seq =
1907            SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
1908        let err = seq.revert_schedule().unwrap_err();
1909        assert!(matches!(err, ApplyError::WrongState { .. }));
1910    }
1911
1912    #[test]
1913    fn test_release_while_scheduled_rejected() {
1914        let manager = create_test_manager::<TestMeta>(20);
1915        let mut seq =
1916            SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
1917
1918        seq.schedule_prefill(4, &manager).unwrap();
1919        let err = seq.release().unwrap_err();
1920        assert!(matches!(err, ApplyError::WrongState { .. }));
1921    }
1922
1923    #[test]
1924    fn test_reacquire_while_scheduled_rejected() {
1925        let manager = create_test_manager::<TestMeta>(20);
1926        let mut seq =
1927            SchedulableSequence::<TestMeta>::new(make_tokens(8), 10, BLOCK_SIZE, noop_delegate());
1928
1929        seq.schedule_prefill(4, &manager).unwrap();
1930        let err = seq.reacquire(&manager).unwrap_err();
1931        assert!(matches!(err, ApplyError::WrongState { .. }));
1932    }
1933
1934    // =========================================================================
1935    // Dangling token tracking
1936    // =========================================================================
1937
1938    #[test]
1939    fn test_dangling_tokens_tracking() {
1940        let manager = create_test_manager::<TestMeta>(20);
1941        let mut seq =
1942            SchedulableSequence::<TestMeta>::new(make_tokens(8), 20, BLOCK_SIZE, noop_delegate());
1943
1944        // Before prefill: all tokens are "not yet computed"
1945        assert_eq!(seq.kv_position(), 0);
1946        assert_eq!(seq.tail_tokens(), 8);
1947
1948        // After prefill with token: 1 dangling
1949        seq.schedule_prefill(8, &manager).unwrap();
1950        seq.apply_prefill(Some(1000), &manager).unwrap();
1951        assert_eq!(seq.kv_position(), 8);
1952        assert_eq!(seq.tail_tokens(), 1);
1953
1954        // After decode: still 1 dangling
1955        seq.schedule_decode(&manager).unwrap();
1956        seq.apply_decode(100, &manager).unwrap();
1957        assert_eq!(seq.kv_position(), 9);
1958        assert_eq!(seq.tail_tokens(), 1);
1959
1960        // After speculative (accept 2): still 1 dangling
1961        seq.schedule_speculative(3, &manager).unwrap();
1962        seq.apply_speculative(&[200, 201], &manager).unwrap();
1963        assert_eq!(seq.kv_position(), 11);
1964        assert_eq!(seq.tail_tokens(), 1);
1965    }
1966
1967    #[test]
1968    fn test_decode_requires_one_dangling() {
1969        let manager = create_test_manager::<TestMeta>(20);
1970
1971        // 0 dangling: empty sequence with no tokens
1972        let mut seq = SchedulableSequence::<TestMeta>::new(vec![], 10, BLOCK_SIZE, noop_delegate());
1973        let err = seq.schedule_decode(&manager).unwrap_err();
1974        assert!(matches!(
1975            err,
1976            ScheduleError::WrongDanglingCount {
1977                expected: 1,
1978                actual: 0,
1979            }
1980        ));
1981
1982        // 2 dangling: append 2 tokens
1983        seq.append_tokens(&[100, 101]).unwrap();
1984        assert_eq!(seq.tail_tokens(), 2);
1985        let err = seq.schedule_decode(&manager).unwrap_err();
1986        assert!(matches!(
1987            err,
1988            ScheduleError::WrongDanglingCount {
1989                expected: 1,
1990                actual: 2,
1991            }
1992        ));
1993    }
1994
1995    #[test]
1996    fn test_append_tokens_creates_dangling() {
1997        let manager = create_test_manager::<TestMeta>(20);
1998        let mut seq = SchedulableSequence::<TestMeta>::new(vec![], 10, BLOCK_SIZE, noop_delegate());
1999
2000        assert_eq!(seq.tail_tokens(), 0);
2001
2002        seq.append_tokens(&[100]).unwrap();
2003        assert_eq!(seq.tail_tokens(), 1);
2004        assert_eq!(seq.total_tokens(), 1);
2005        assert_eq!(seq.kv_position(), 0);
2006
2007        // Now can schedule decode
2008        seq.schedule_decode(&manager).unwrap();
2009        seq.apply_decode(101, &manager).unwrap();
2010        assert_eq!(seq.tail_tokens(), 1);
2011        assert_eq!(seq.kv_position(), 1);
2012    }
2013
2014    #[test]
2015    fn test_append_tokens_exceeding_remaining_returns_error_without_mutation() {
2016        let mut seq = SchedulableSequence::<TestMeta>::new(vec![], 1, BLOCK_SIZE, noop_delegate());
2017
2018        let err = seq.append_tokens(&[100, 101]).unwrap_err();
2019
2020        assert!(matches!(
2021            err,
2022            ApplyError::AppendExceedsRemaining {
2023                requested: 2,
2024                remaining: 1,
2025            }
2026        ));
2027        assert_eq!(seq.generated_tokens(), 0);
2028        assert_eq!(seq.remaining_tokens(), 1);
2029        assert_eq!(seq.total_tokens(), 0);
2030        assert_eq!(seq.tail_tokens(), 0);
2031        assert_eq!(seq.kv_position(), 0);
2032    }
2033
2034    #[test]
2035    fn test_kv_position_through_lifecycle() {
2036        let manager = create_test_manager::<TestMeta>(20);
2037        let mut seq =
2038            SchedulableSequence::<TestMeta>::new(make_tokens(8), 20, BLOCK_SIZE, noop_delegate());
2039
2040        assert_eq!(seq.kv_position(), 0);
2041
2042        // Chunked prefill
2043        seq.schedule_prefill(4, &manager).unwrap();
2044        seq.apply_prefill(None, &manager).unwrap();
2045        assert_eq!(seq.kv_position(), 4);
2046
2047        seq.schedule_prefill(4, &manager).unwrap();
2048        seq.apply_prefill(Some(1000), &manager).unwrap();
2049        assert_eq!(seq.kv_position(), 8);
2050
2051        // Decode
2052        seq.schedule_decode(&manager).unwrap();
2053        seq.apply_decode(100, &manager).unwrap();
2054        assert_eq!(seq.kv_position(), 9);
2055
2056        // Speculative
2057        seq.schedule_speculative(3, &manager).unwrap();
2058        seq.apply_speculative(&[200, 201, 202], &manager).unwrap();
2059        assert_eq!(seq.kv_position(), 12);
2060    }
2061
2062    #[test]
2063    fn test_pending_completion_staged_during_decode() {
2064        let manager = create_test_manager::<TestMeta>(20);
2065        // 7 input tokens: block 0 complete (0-3), block 1 partial (4-6)
2066        let mut seq =
2067            SchedulableSequence::<TestMeta>::new(make_tokens(7), 10, BLOCK_SIZE, noop_delegate());
2068
2069        seq.schedule_prefill(7, &manager).unwrap();
2070        seq.apply_prefill(Some(1000), &manager).unwrap();
2071
2072        // After prefill: token 1000 completed block 1 (4,5,6,1000) but NOT registered
2073        assert_eq!(seq.assigned_blocks(), 1); // only block 0
2074        assert_eq!(seq.unassigned_blocks(), 1); // partial-tail block from div_ceil
2075        assert_eq!(seq.kv_position(), 7);
2076        assert_eq!(seq.tail_tokens(), 1); // token 1000
2077
2078        // schedule_decode detects the pending completion
2079        seq.schedule_decode(&manager).unwrap();
2080        assert_eq!(
2081            seq.state(),
2082            SequenceState::DecodeScheduled {
2083                blocks_allocated: 1
2084            }
2085        ); // 1 pending + 1 gen, but already had 1 unassigned
2086
2087        // apply_decode stages the pending block
2088        let outcome = seq.apply_decode(100, &manager).unwrap();
2089        assert_eq!(outcome, DecodeOutcome::Continue); // no boundary from THIS token
2090        assert_eq!(seq.assigned_blocks(), 2); // block 1 now registered
2091        assert_eq!(seq.unassigned_blocks(), 1); // gen block
2092        assert_eq!(seq.kv_position(), 8);
2093        assert_eq!(seq.tail_tokens(), 1);
2094    }
2095
2096    // =========================================================================
2097    // Builder
2098    // =========================================================================
2099
2100    #[test]
2101    fn test_builder_basic() {
2102        let seq = SchedulableSequence::<TestMeta>::builder()
2103            .tokens(make_tokens(8))
2104            .max_output_tokens(10)
2105            .block_size(BLOCK_SIZE)
2106            .build::<TestMeta>()
2107            .unwrap();
2108
2109        assert_eq!(seq.state(), SequenceState::Idle);
2110        assert_eq!(seq.num_input_tokens(), 8);
2111        assert_eq!(seq.max_output_tokens(), 10);
2112        assert_eq!(seq.block_size(), BLOCK_SIZE as usize);
2113    }
2114
2115    #[test]
2116    fn test_builder_with_delegate() {
2117        let delegate = Arc::new(CollectingDelegate::new());
2118        let seq = SchedulableSequence::<TestMeta>::builder()
2119            .tokens(make_tokens(4))
2120            .max_output_tokens(5)
2121            .block_size(BLOCK_SIZE)
2122            .delegate(delegate.clone())
2123            .build::<TestMeta>()
2124            .unwrap();
2125
2126        assert_eq!(seq.num_input_tokens(), 4);
2127
2128        let events = delegate.events();
2129        assert_eq!(events.len(), 1);
2130        assert!(matches!(events[0], SequenceEvent::Created { .. }));
2131    }
2132
2133    #[test]
2134    fn test_builder_missing_required_field() {
2135        let result = SchedulableSequence::<TestMeta>::builder()
2136            .tokens(make_tokens(4))
2137            // missing max_output_tokens and block_size
2138            .build::<TestMeta>();
2139
2140        assert!(result.is_err());
2141    }
2142
2143    #[test]
2144    fn test_builder_default_noop_delegate() {
2145        let manager = create_test_manager::<TestMeta>(20);
2146        let mut seq = SchedulableSequence::<TestMeta>::builder()
2147            .tokens(make_tokens(4))
2148            .max_output_tokens(10)
2149            .block_size(BLOCK_SIZE)
2150            .build::<TestMeta>()
2151            .unwrap();
2152
2153        // Verify the noop delegate doesn't panic — exercise the full lifecycle
2154        seq.schedule_prefill(4, &manager).unwrap();
2155        seq.apply_prefill(Some(1000), &manager).unwrap();
2156        seq.schedule_decode(&manager).unwrap();
2157        seq.apply_decode(100, &manager).unwrap();
2158        seq.release().unwrap();
2159    }
2160}