dynamo-backend-common 1.3.0-dev.1

Shared runtime glue for Rust LLM backends.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

//! `LLMEngine` trait plus registration-metadata and output-construction helpers.
//!
//! The trait takes the same `PreprocessedRequest` / `LLMEngineOutput` types used
//! across preprocessing, routing, and the frontend — no separate data-shape
//! translation layer for Rust engines.
//!
//! Object-safety: every instance method takes `&self`. `Arc<dyn LLMEngine>` is
//! the handle `Worker` drives the lifecycle through.

use std::collections::BTreeMap;
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::Arc;

use async_trait::async_trait;
use futures::stream::BoxStream;
use tokio::sync::watch;

use crate::error::DynamoError;

pub use dynamo_llm::kv_router::publisher::KvEventPublisher;
pub use dynamo_llm::protocols::common::llm_backend::LLMEngineOutput;
pub use dynamo_llm::protocols::common::preprocessor::{
    BootstrapInfo, PrefillResult, PreprocessedRequest,
};
pub use dynamo_llm::protocols::common::{
    FinishReason, OutputOptions, SamplingOptions, StopConditions,
};
pub use dynamo_protocols::types::CompletionUsage;
pub use dynamo_runtime::engine::AsyncEngineContext;

/// Per-request handle wrapping the runtime context. `Deref`s to
/// `dyn AsyncEngineContext` so engine code uses it transparently.
pub struct GenerateContext {
    inner: Arc<dyn AsyncEngineContext>,
    /// Decode-mode first-token signal. `Some` only on decode-mode requests;
    /// `None` otherwise.
    first_token: Option<watch::Sender<bool>>,
    metadata: BTreeMap<String, String>,
}

impl GenerateContext {
    pub fn new(
        inner: Arc<dyn AsyncEngineContext>,
        first_token: Option<watch::Sender<bool>>,
    ) -> Self {
        Self {
            inner,
            first_token,
            metadata: BTreeMap::new(),
        }
    }

    pub fn with_metadata(
        inner: Arc<dyn AsyncEngineContext>,
        first_token: Option<watch::Sender<bool>>,
        metadata: BTreeMap<String, String>,
    ) -> Self {
        Self {
            inner,
            first_token,
            metadata,
        }
    }

    /// Clone the underlying runtime context Arc — for spawned tasks
    /// outliving `generate`'s scope.
    pub fn inner_arc(&self) -> Arc<dyn AsyncEngineContext> {
        self.inner.clone()
    }

    /// Fire the first-token signal. Idempotent; no-op on non-decode
    /// requests. Engines normally don't need this — the framework
    /// auto-fires on the first non-empty chunk. Use only when first-token
    /// is observable via a side channel before the main stream yields.
    pub fn notify_first_token(&self) {
        if let Some(tx) = &self.first_token {
            let _ = tx.send(true);
        }
    }

    /// Framework-internal: borrow the underlying Sender for cross-boundary
    /// threading (PyO3 mirrors this handle into Python's `Context` so
    /// `notify_first_token()` fires the same signal). Rust engines should
    /// call [`notify_first_token`](Self::notify_first_token) instead.
    pub fn first_token_sender(&self) -> Option<&watch::Sender<bool>> {
        self.first_token.as_ref()
    }

    pub fn metadata(&self) -> &BTreeMap<String, String> {
        &self.metadata
    }
}

impl Deref for GenerateContext {
    type Target = dyn AsyncEngineContext;
    fn deref(&self) -> &Self::Target {
        &*self.inner
    }
}

/// Registration metadata returned by [`LLMEngine::start`].
///
/// `Worker` consumes this to build a `ModelDeploymentCard` and register the
/// model with discovery. `None` on an optional field means "don't advertise":
/// the router sees no value and falls back to round-robin (for scheduling
/// hints) or its configured defaults. Engines without a traditional KV cache
/// can leave `kv_cache_block_size` and `total_kv_blocks` unset.
#[derive(Clone, Debug, Default)]
pub struct EngineConfig {
    /// Canonical model identifier (e.g. HF repo name).
    pub model: String,
    /// Public-facing model name advertised to clients. Defaults to `model`.
    pub served_model_name: Option<String>,
    /// Maximum context length the engine supports, in tokens.
    pub context_length: Option<u32>,
    /// KV cache block size, in tokens. Used by KV-aware routing. `None`
    /// means the engine has no block-structured KV cache; KV-aware routing
    /// falls back to round-robin for this backend.
    pub kv_cache_block_size: Option<u32>,
    /// Total number of KV cache blocks available to the engine. `None`
    /// means "not advertised"; the planner treats the backend as having
    /// no KV-capacity hint.
    pub total_kv_blocks: Option<u64>,
    /// Maximum number of concurrent in-flight sequences.
    pub max_num_seqs: Option<u64>,
    /// Maximum tokens the engine will process in a single batched step.
    pub max_num_batched_tokens: Option<u64>,
    /// Number of data-parallel ranks this worker hosts. Defaults to 1.
    /// The router uses this to enumerate per-rank load when scoring.
    pub data_parallel_size: Option<u32>,
    /// Global index of the first DP rank this worker hosts. Defaults to 0.
    /// Non-zero only under multi-worker DP layouts where each worker owns a
    /// sub-range (e.g. vLLM hybrid/external LB, SGLang DP-attention across
    /// multiple nodes). The router enumerates ranks
    /// `[data_parallel_start_rank, data_parallel_start_rank + data_parallel_size)`.
    pub data_parallel_start_rank: Option<u32>,
    /// Bootstrap host this prefill worker advertises to decode peers.
    ///
    /// Only meaningful for backends with a Dynamo-level host/port
    /// handshake (today: SGLang). Backends whose KV transport is
    /// internal — TRT-LLM uses TRT-LLM's transceiver, vLLM uses vLLM's
    /// `NixlConnector` — should leave this `None`.
    ///
    /// Engines that do use it set this in `start()` after the engine
    /// has resolved its bootstrap address (SGLang reads
    /// `tokenizer_manager.server_args.disaggregation_bootstrap_port`).
    /// When both `bootstrap_host` and `bootstrap_port` are `Some`,
    /// `Worker` publishes them via
    /// `ModelRuntimeConfig::disaggregated_endpoint` so the frontend's
    /// `PrefillRouter` can take its optimised "Bootstrap path" (route
    /// decode concurrent with prefill instead of waiting for prefill
    /// to drain).
    pub bootstrap_host: Option<String>,
    /// Bootstrap port for disaggregated KV transfer. See `bootstrap_host`.
    pub bootstrap_port: Option<u16>,
    /// Engine-specific metadata copied into `ModelRuntimeConfig.runtime_data`.
    pub runtime_data: HashMap<String, serde_json::Value>,
}

/// Inference engine trait.
///
/// Lifecycle:
///   1. Construct the engine (typically via a backend-specific `from_args`).
///   2. `start()` — start the engine, return `EngineConfig` metadata.
///   3. `generate()` — called for each request (concurrent calls expected).
///   4. `abort()` — called when a request is cancelled (optional, default no-op).
///   5. `cleanup()` — called once on shutdown, release all resources.
#[async_trait]
pub trait LLMEngine: Send + Sync + 'static {
    /// Start the engine and return registration metadata.
    ///
    /// After this returns, the engine MUST be ready to accept `generate()`
    /// calls. `Worker` will register the model and begin serving immediately.
    /// Use interior mutability for any state allocated here.
    ///
    /// `worker_id` is an opaque, runtime-allocated unique identifier for
    /// this worker. It is stable from `start()` onward for the worker's
    /// lifetime and unique across replicas in the cluster. Engines that
    /// need a per-worker key for cluster-wide bookkeeping (e.g. TRT-LLM's
    /// 10-bit `disagg_machine_id` snowflake field) should derive it from
    /// this value rather than hashing host/pid or asking operators for a
    /// CLI override. The internal mechanism (discovery instance ID) is
    /// not part of the contract — engines should treat it as opaque.
    ///
    /// `start()` is async and may take minutes for real backends (e.g.
    /// compiling a model graph on an accelerator). Emit
    /// `tracing::info!` checkpoints so operators see progress — this
    /// call is otherwise a silent window between process launch and
    /// endpoint serving.
    async fn start(&self, worker_id: u64) -> Result<EngineConfig, DynamoError>;

    /// Yield streaming response chunks for a single request.
    ///
    /// Called concurrently for multiple in-flight requests. The returned
    /// stream MUST poll `ctx.is_stopped()` between yields; on cancellation,
    /// emit a terminal `Ok(chunk)` with `FinishReason::Cancelled`.
    ///
    /// Stream item: `Result<LLMEngineOutput, DynamoError>`.
    ///   * `Ok(chunk)` carries normal output. Exactly one terminal `Ok`
    ///     chunk (one with `finish_reason` set) must be the last item
    ///     yielded, and no items may follow it.
    ///   * `Err(dynamo_err)` carries a typed mid-stream failure (e.g.
    ///     `BackendError::InvalidArgument`). It is itself terminal — the
    ///     framework forwards it as `Annotated::error` and stops polling
    ///     the stream. Use this instead of yielding an `Ok` chunk with
    ///     `FinishReason::Error` when you want the typed `BackendError`
    ///     variant preserved end-to-end.
    ///
    /// `completion_usage` on the terminal is optional but recommended —
    /// the frontend aggregates it when present. In debug builds, the
    /// framework wraps the stream in a validator that panics on contract
    /// violations.
    ///
    /// The returned stream is `'static`: clone or move any state from
    /// `&self` or `request` into the stream body before constructing it.
    /// Use [`chunk::token`] for non-terminal chunks and
    /// [`LLMEngineOutput::cancelled`] / `::stop` / `::length` / `::error`
    /// for terminal chunks (combine with [`LLMEngineOutputExt`] for
    /// fluent field setting).
    async fn generate(
        &self,
        request: PreprocessedRequest,
        ctx: GenerateContext,
    ) -> Result<BoxStream<'static, Result<LLMEngineOutput, DynamoError>>, DynamoError>;

    /// Abort an in-flight request (optional, default no-op).
    ///
    /// Called by the framework only when `ctx.stopped()` or `ctx.killed()`
    /// fires — i.e. when the client or operator explicitly cancels. It is
    /// NOT called when the response stream is simply dropped (e.g. TCP
    /// reset, consumer-side timeout without cancellation).
    ///
    /// For cleanup that must happen on ANY drop path (releasing an
    /// accelerator slot, freeing a request handle), put the release logic
    /// inside the `generate` stream body using RAII — a guard whose
    /// `Drop` runs when the stream is dropped, however that happens. Use
    /// `abort` only for out-of-band notifications (e.g. telling a remote
    /// scheduler to cancel compute early).
    async fn abort(&self, _ctx: Arc<dyn AsyncEngineContext>) {}

    /// Drain in-flight engine work before shutdown (optional, default no-op).
    ///
    /// Called once during graceful shutdown after the discovery unregister
    /// + grace-period sleep, but before [`cleanup`](LLMEngine::cleanup).
    /// Use it for backend-side draining that must complete while the
    /// distributed runtime (NATS / etcd) is still alive — e.g. waiting for
    /// in-flight NIXL KV transfers on prefill workers (issue #7319), so
    /// downstream decode workers don't observe a use-after-free on freed
    /// GPU memory.
    ///
    /// Failures are logged and swallowed; shutdown proceeds regardless.
    async fn drain(&self) -> Result<(), DynamoError> {
        Ok(())
    }

    /// Release all engine resources. Called exactly once.
    ///
    /// `Worker` guarantees:
    ///
    /// * `cleanup` runs after [`start`](LLMEngine::start) succeeded
    ///   *and* on shutdown — the common case.
    /// * `cleanup` also runs after `start` raised, on the partial
    ///   state the engine may have allocated before failing (inner
    ///   LLM handle, sockets, background tasks). Implementations
    ///   **must** be null-safe: guard each resource with an `is
    ///   None` / `Option::is_some` check so a partially constructed
    ///   engine can be released without panic.
    /// * `cleanup` is **not** called when `start` was never invoked
    ///   (pre-start shutdown via SIGTERM during distributed runtime
    ///   construction). Engines whose constructors allocate
    ///   resources must release them via `Drop` rather than rely on
    ///   `cleanup`.
    ///
    /// `cleanup` must also be idempotent: a second call after a
    /// successful first call must return `Ok(())` without re-entering
    /// teardown (NCCL groups and similar fail noisily on double-free).
    async fn cleanup(&self) -> Result<(), DynamoError>;

    /// KV event sources advertised by this engine, one per dp_rank.
    /// Empty by default (engine opts out of KV-aware routing).
    async fn kv_event_sources(&self) -> Result<Vec<KvEventSource>, DynamoError> {
        Ok(Vec::new())
    }

    /// Wire up Prometheus surfaces. Called once by `Worker` after
    /// [`start`](LLMEngine::start) succeeds. Default returns an empty
    /// [`MetricsBindings`] (no per-rank gauges, no foreign callbacks).
    ///
    /// Two things an engine can produce here:
    /// 1. Bridge a vendor-prefixed registry (`vllm:*`, `sglang:*`,
    ///    `trtllm_*`, `lmcache:*`) into the runtime's `/metrics` output
    ///    via [`EngineMetrics::add_expfmt_callback`](crate::metrics::EngineMetrics::add_expfmt_callback)
    ///    on `ctx.metrics`. Side-effect only.
    /// 2. Declare `dp_ranks` in [`MetricsBindings`] to opt into the
    ///    per-rank `dynamo_component_*` gauges + KV router signal. The
    ///    framework constructs a
    ///    [`SnapshotPublisher`](crate::snapshot_publisher::SnapshotPublisher)
    ///    sized to those ranks and hands it back via
    ///    [`MetricsBindings::on_publisher_ready`]. Stash the `Arc` and
    ///    call `publisher.publish(rank, snap)` from your stat-logger
    ///    thread thereafter — event-driven, no polling.
    ///
    /// Framework-owned lifecycle gauges (cleanup_time, drain_time,
    /// model_load_time) are emitted by `Worker` independent of this
    /// method — they do NOT require the engine to opt in.
    ///
    /// Errors abort startup; `cleanup` runs on the partial state. Do not
    /// retain `ctx.metrics` past return.
    async fn setup_metrics(&self, _ctx: MetricsCtx<'_>) -> Result<MetricsBindings, DynamoError> {
        Ok(MetricsBindings::default())
    }

    /// Canary payload registered with the runtime's `HealthCheckManager`.
    /// `Worker` calls this once after [`start`](LLMEngine::start). Returning
    /// `Ok(None)` (default) disables active probing — the endpoint then
    /// relies on the activity-driven notifier. Operator overrides
    /// (`DYN_HEALTH_CHECK_PAYLOAD` env / `WorkerConfig`) take precedence
    /// and fully replace this value when set.
    async fn health_check_payload(&self) -> Result<Option<serde_json::Value>, DynamoError> {
        Ok(None)
    }

    /// Semantic engine controls this engine supports. Empty by default.
    ///
    /// Engines advertise control keys and implement them via
    /// [`LLMEngine::engine_control`]. Mapping those keys onto runtime routes is
    /// owned by the unified backend layer.
    async fn supported_controls(&self) -> Result<Vec<String>, DynamoError> {
        Ok(Vec::new())
    }

    /// Handle one semantic engine-control request.
    async fn engine_control(
        &self,
        control: String,
        _body: serde_json::Value,
    ) -> Result<serde_json::Value, DynamoError> {
        Ok(serde_json::json!({
            "status": "error",
            "message": format!("unsupported engine control: {control}"),
        }))
    }
}

/// Marker key stamped on canary payloads. Handlers may inspect it to branch
/// probe-specific behavior (e.g. skip a synthetic first-yield that would
/// mask a hung engine rank). Re-exported to Python via
/// `lib/bindings/python/src/dynamo/health_check.py`.
pub const HEALTH_CHECK_KEY: &str = "_HEALTH_CHECK";

/// Invoked once with a freshly-built publisher; engine drives `publish`
/// from its own thread thereafter.
pub type OnPublisherReady =
    Box<dyn FnOnce(Arc<KvEventPublisher>) -> Result<(), DynamoError> + Send + 'static>;

/// KV event source descriptor. Two flavors: subscribe to an engine-provided
/// ZMQ PUB, or hand a publisher to the engine and let it drive `publish`
/// from its own thread (for engines whose event API blocks the caller).
pub enum KvEventSource {
    Zmq {
        endpoint: String,
        topic: String,
        dp_rank: u32,
    },
    Push {
        on_ready: OnPublisherReady,
        dp_rank: u32,
    },
}

impl KvEventSource {
    /// Data-parallel rank this source publishes for.
    pub fn dp_rank(&self) -> u32 {
        match self {
            KvEventSource::Zmq { dp_rank, .. } | KvEventSource::Push { dp_rank, .. } => *dp_rank,
        }
    }
}

/// Worker-level metrics snapshot consumed by the KV router.
///
/// `kv_used_blocks` is the primary load signal the router scores against.
/// `Option` because the engine may not have a snapshot yet on the first few
/// ticks after `start()`; `Worker` skips the publish call in that case.
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub struct Metrics {
    /// Number of KV blocks currently occupied across all in-flight requests.
    pub kv_used_blocks: Option<u64>,
}

/// Rich per-rank snapshot driving both the router-input signal and the
/// per-rank `dynamo_component_*` gauges.
///
/// Engines call
/// [`SnapshotPublisher::publish`](crate::snapshot_publisher::SnapshotPublisher::publish)
/// with a fresh `ComponentSnapshot` from their stat-logger thread —
/// event-driven, no polling. The publisher atomically updates both
/// consumers inline (Rust gauges + NATS router signal).
#[derive(Clone, Copy, Debug, Default, PartialEq)]
pub struct ComponentSnapshot {
    pub kv_used_blocks: u64,
    pub kv_total_blocks: u64,
    /// Fractional cache usage, 0.0..1.0.
    pub gpu_cache_usage: f32,
    /// Fractional prefix cache hit rate, 0.0..1.0.
    ///
    /// Tri-state:
    /// - `Some(x)`: engine measured a hit rate this interval (publish as gauge).
    /// - `None`: no data yet OR engine has no prefix cache. The
    ///   gauge is NOT updated — distinguishes "0% hits" (which is a
    ///   legitimate measurement) from "we never measured."
    ///
    /// Each backend computes from its native counters
    /// (vLLM: `PrefixCacheStats.hits/queries`,
    ///  SGLang: `kv_metrics.cache_hit_rate_perc`,
    ///  TRT-LLM: `kv_stats["cacheHitRate"]`).
    pub kv_cache_hit_rate: Option<f32>,
    pub dp_rank: u32,
}

/// Context handed to [`LLMEngine::setup_metrics`].
pub struct MetricsCtx<'a> {
    pub model: &'a str,
    pub component: &'a str,
    pub model_load_time_seconds: f64,
    /// Use this to bridge a vendor-prefixed Prometheus registry into the
    /// runtime's `/metrics` output via `add_expfmt_callback`. Do NOT
    /// retain past the call's return.
    pub metrics: &'a crate::metrics::EngineMetrics,
}

/// Invoked once with a freshly-built [`SnapshotPublisher`]; engine drives
/// `publish(rank, snapshot)` from its own stat-logger threads thereafter.
///
/// Mirror of [`OnPublisherReady`] for the KV-event Push flavor — same
/// "framework constructs, engine writes" handoff pattern.
pub type OnSnapshotPublisherReady = Box<
    dyn FnOnce(Arc<crate::snapshot_publisher::SnapshotPublisher>) -> Result<(), DynamoError>
        + Send
        + 'static,
>;

/// What an engine returns from [`LLMEngine::setup_metrics`].
///
/// - `dp_ranks`: the data-parallel ranks this engine will publish
///   snapshots for. Stable for the engine's lifetime. Empty = opt out.
/// - `on_publisher_ready`: invoked exactly once with the constructed
///   `SnapshotPublisher`. Engine stashes it and calls
///   `publisher.publish(rank, snap)` from its stat-logger thereafter.
///
/// Foreign-registry expfmt callbacks (vLLM/SGLang/TRT-LLM vendor metrics)
/// are wired as a side effect on `ctx.metrics` in `setup_metrics` — they
/// don't flow through this struct.
#[derive(Default)]
pub struct MetricsBindings {
    pub dp_ranks: Vec<u32>,
    pub on_publisher_ready: Option<OnSnapshotPublisherReady>,
}

/// Non-terminal chunk constructor. Terminal chunks come from upstream
/// [`LLMEngineOutput::cancelled`] / `::stop` / `::length` / `::error`.
pub mod chunk {
    use super::LLMEngineOutput;

    /// Non-terminal chunk carrying a single token.
    pub fn token(id: u32) -> LLMEngineOutput {
        LLMEngineOutput {
            token_ids: vec![id],
            ..Default::default()
        }
    }
}

/// Fluent setters for [`LLMEngineOutput`] — combine with upstream
/// constructors (`LLMEngineOutput::length()`, `::cancelled()`, etc.) to
/// avoid the `let mut output = ...; output.field = ...;` pattern.
///
/// ```ignore
/// use dynamo_backend_common::{LLMEngineOutput, LLMEngineOutputExt, usage};
///
/// yield LLMEngineOutput::length()
///     .with_tokens(vec![final_id])
///     .with_usage(usage(prompt_len, n));
/// ```
pub trait LLMEngineOutputExt: Sized {
    /// Replace `token_ids`.
    fn with_tokens(self, tokens: Vec<u32>) -> Self;
    /// Attach usage stats.
    fn with_usage(self, usage: CompletionUsage) -> Self;
}

impl LLMEngineOutputExt for LLMEngineOutput {
    fn with_tokens(mut self, tokens: Vec<u32>) -> Self {
        self.token_ids = tokens;
        self
    }
    fn with_usage(mut self, usage: CompletionUsage) -> Self {
        self.completion_usage = Some(usage);
        self
    }
}

/// Build a [`CompletionUsage`] from prompt and completion counts.
/// `total_tokens` saturates on overflow (realistic LLM contexts are far
/// from `u32::MAX`).
pub fn usage(prompt_tokens: u32, completion_tokens: u32) -> CompletionUsage {
    CompletionUsage {
        prompt_tokens,
        completion_tokens,
        total_tokens: prompt_tokens.saturating_add(completion_tokens),
        prompt_tokens_details: None,
        completion_tokens_details: None,
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn chunk_token_sets_only_token_ids() {
        let c = chunk::token(42);
        assert_eq!(c.token_ids, vec![42]);
        assert!(c.finish_reason.is_none());
        assert!(c.completion_usage.is_none());
    }

    #[test]
    fn ext_with_tokens_and_with_usage() {
        let terminal = LLMEngineOutput::length()
            .with_tokens(vec![1, 2, 3])
            .with_usage(usage(10, 3));
        assert_eq!(terminal.token_ids, vec![1, 2, 3]);
        assert!(matches!(terminal.finish_reason, Some(FinishReason::Length)));
        assert_eq!(terminal.completion_usage.unwrap().total_tokens, 13);
    }

    #[test]
    fn usage_sums_totals() {
        let u = usage(7, 11);
        assert_eq!(u.total_tokens, 18);
    }

    #[test]
    fn usage_saturates_on_overflow() {
        let u = usage(u32::MAX, 10);
        assert_eq!(u.total_tokens, u32::MAX);
    }
}