dynamo_llm/kv_router/
protocols.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::tokens::Token;
17use serde::{Deserialize, Serialize};
18
19#[derive(Debug, Clone, Serialize, Deserialize, Default)]
20pub struct RouterRequest {
21    pub tokens: Vec<Token>,
22}
23
24#[derive(Debug, Clone, Serialize, Deserialize, Default)]
25pub struct RouterResponse {
26    pub worker_id: i64,
27}
28
29#[derive(Debug)]
30pub struct WorkerSelectionResult {
31    /// The worker id of the selected worker
32    pub worker_id: i64,
33
34    /// The total number of blocks required to prefill the request
35    pub required_blocks: u64,
36
37    /// The number of blocks that the selected worker may already have cached.
38    /// This is not a guarantee, but an estimate.
39    pub overlap_blocks: usize,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize, Default)]
43pub struct ForwardPassMetrics {
44    // https://lmsys.org/blog/2024-12-04-sglang-v0-4/#data-parallelism-attention-for-deepseek-models
45    // Data parallel ranks are semi-independent, so we need to track metrics at the DP level
46    pub data_parallel_rank: Option<u32>, // Optional for backwards compatibility
47    pub request_active_slots: u64,
48    pub request_total_slots: u64,
49    pub kv_active_blocks: u64,
50    pub kv_total_blocks: u64,
51    // integer from 0 to large number
52    pub num_requests_waiting: u64,
53    // percentage represented as a float from 0 to 1
54    pub gpu_cache_usage_perc: f32,
55    // percentage represented as a float from 0 to 1
56    pub gpu_prefix_cache_hit_rate: f32,
57}
58
59/// A [`LocalBlockHash`] is a hash computed from the tokens_ids, extra_token_ids and the optional
60/// lora_id of a block.
61#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Ord, PartialOrd)]
62pub struct LocalBlockHash(pub u64);
63
64/// A sequence aware hash of a block where the hash is computed from the tokens_ids, extra_token_ids
65/// and the optional lora_id of a block, PLUS the hash of the parent block.
66///
67/// In this case, the hashing function is external and unknown.
68#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Ord, PartialOrd)]
69pub struct ExternalSequenceBlockHash(pub u64);
70
71// Implement From trait for convenient conversion
72impl From<u64> for ExternalSequenceBlockHash {
73    fn from(value: u64) -> Self {
74        Self(value)
75    }
76}
77
78impl From<i64> for ExternalSequenceBlockHash {
79    /// Bitwise reinterpretation: preserves all bits, including negatives.
80    /// This is lossless, but negative i64 values will appear as large u64 values.
81    fn from(value: i64) -> Self {
82        Self(value as u64)
83    }
84}
85
86/// Represents a collection of cache events and a shutdown flag.
87#[derive(Serialize, Deserialize, Debug, Clone)]
88pub struct KvCacheEvents {
89    /// A list of cache events.
90    pub events: Vec<KvCacheEvent>,
91    /// A flag indicating whether the cache is shutting down.
92    pub shutdown: bool,
93}
94
95/// Represents a single cache event with an ID and associated data.
96#[derive(Serialize, Deserialize, Debug, Clone)]
97pub struct KvCacheEvent {
98    /// The unique identifier of the event.
99    pub event_id: u64,
100    /// The data associated with the event.
101    pub data: KvCacheEventData,
102}
103
104/// Represents the data associated with a cache event.
105///
106/// Data is either stored or removed.
107#[derive(Serialize, Deserialize, Debug, Clone)]
108#[serde(rename_all = "snake_case")]
109pub enum KvCacheEventData {
110    Stored(KvCacheStoreData),
111    Removed(KvCacheRemoveData),
112    Cleared,
113}
114
115/// Represents the data associated with a stored cache event.
116#[derive(Serialize, Deserialize, Debug, Clone)]
117pub struct KvCacheStoreData {
118    /// The optional hash of the parent block.
119    pub parent_hash: Option<ExternalSequenceBlockHash>,
120    /// A list of stored blocked data.
121    pub blocks: Vec<KvCacheStoredBlockData>,
122}
123
124/// Represents data for a stored block.
125#[derive(Serialize, Deserialize, Debug, Clone)]
126pub struct KvCacheStoredBlockData {
127    /// The hash of the block.
128    pub block_hash: ExternalSequenceBlockHash,
129    /// The hash of the tokens in the block.
130    pub tokens_hash: LocalBlockHash,
131}
132
133/// Represents the data associated with a removed cache event.
134#[derive(Serialize, Deserialize, Debug, Clone)]
135pub struct KvCacheRemoveData {
136    /// A list of block hashes to remove.
137    pub block_hashes: Vec<ExternalSequenceBlockHash>,
138}
139
140impl Serialize for LocalBlockHash {
141    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
142    where
143        S: serde::Serializer,
144    {
145        serializer.serialize_u64(self.0)
146    }
147}
148
149impl<'de> Deserialize<'de> for LocalBlockHash {
150    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
151    where
152        D: serde::Deserializer<'de>,
153    {
154        let value = u64::deserialize(deserializer)?;
155        Ok(LocalBlockHash(value))
156    }
157}
158
159impl Serialize for ExternalSequenceBlockHash {
160    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
161    where
162        S: serde::Serializer,
163    {
164        serializer.serialize_u64(self.0)
165    }
166}
167
168impl<'de> Deserialize<'de> for ExternalSequenceBlockHash {
169    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
170    where
171        D: serde::Deserializer<'de>,
172    {
173        let value = u64::deserialize(deserializer)?;
174        Ok(ExternalSequenceBlockHash(value))
175    }
176}
177
178#[cfg(test)]
179mod tests {
180    use super::*;
181    use serde_json;
182
183    #[test]
184    fn test_local_block_hash_serialization() {
185        let hash = LocalBlockHash(12345);
186        let serialized = serde_json::to_string(&hash).unwrap();
187        assert_eq!(serialized, "12345");
188
189        let deserialized: LocalBlockHash = serde_json::from_str(&serialized).unwrap();
190        assert_eq!(deserialized, hash);
191    }
192
193    #[test]
194    fn test_external_sequence_block_hash_serialization() {
195        let hash = ExternalSequenceBlockHash(67890);
196        let serialized = serde_json::to_string(&hash).unwrap();
197        assert_eq!(serialized, "67890");
198
199        let deserialized: ExternalSequenceBlockHash = serde_json::from_str(&serialized).unwrap();
200        assert_eq!(deserialized, hash);
201    }
202
203    #[test]
204    fn test_kv_cache_events_serialization() {
205        let event_data = KvCacheEventData::Stored(KvCacheStoreData {
206            parent_hash: Some(ExternalSequenceBlockHash(1)),
207            blocks: vec![KvCacheStoredBlockData {
208                block_hash: ExternalSequenceBlockHash(2),
209                tokens_hash: LocalBlockHash(3),
210            }],
211        });
212
213        let event = KvCacheEvent {
214            event_id: 1,
215            data: event_data,
216        };
217
218        let events = KvCacheEvents {
219            events: vec![event],
220            shutdown: false,
221        };
222
223        let serialized = serde_json::to_string(&events).unwrap();
224        let deserialized: KvCacheEvents = serde_json::from_str(&serialized).unwrap();
225
226        assert_eq!(deserialized.events.len(), 1);
227        assert_eq!(deserialized.events[0].event_id, 1);
228        if let KvCacheEventData::Stored(store_data) = &deserialized.events[0].data {
229            assert_eq!(store_data.parent_hash.unwrap().0, 1);
230            assert_eq!(store_data.blocks.len(), 1);
231            assert_eq!(store_data.blocks[0].block_hash.0, 2);
232            assert_eq!(store_data.blocks[0].tokens_hash.0, 3);
233        } else {
234            panic!("Expected KvCacheEventData::Stored variant");
235        }
236        assert!(!deserialized.shutdown);
237    }
238
239    #[test]
240    fn test_kv_cache_remove_data_serialization() {
241        let remove_data = KvCacheRemoveData {
242            block_hashes: vec![ExternalSequenceBlockHash(4), ExternalSequenceBlockHash(5)],
243        };
244
245        let serialized = serde_json::to_string(&remove_data).unwrap();
246        let deserialized: KvCacheRemoveData = serde_json::from_str(&serialized).unwrap();
247
248        assert_eq!(deserialized.block_hashes.len(), 2);
249        assert_eq!(deserialized.block_hashes[0].0, 4);
250        assert_eq!(deserialized.block_hashes[1].0, 5);
251    }
252}