dynamo_runtime/pipeline/network/
codec.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Codec Module
17//!
18//! Codec map structure into blobs of bytes and streams of bytes.
19//!
20//! In this module, we define three primary codec used to issue single, two-part or multi-part messages,
21//! on a byte stream.
22
23use tokio_util::{
24    bytes::{Buf, BufMut, BytesMut},
25    codec::{Decoder, Encoder},
26};
27
28mod two_part;
29
30pub use two_part::{TwoPartCodec, TwoPartMessage, TwoPartMessageType};
31
32// // Custom codec that reads a u64 length header and the message of that length
33// #[derive(Default)]
34// pub struct LengthPrefixedCodec;
35
36// impl LengthPrefixedCodec {
37//     pub fn new() -> Self {
38//         LengthPrefixedCodec {}
39//     }
40// }
41
42// impl Decoder for LengthPrefixedCodec {
43//     type Item = Vec<u8>;
44//     type Error = tokio::io::Error;
45
46//     fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
47//         // Check if enough bytes are available to read the length (u64 = 8 bytes)
48//         if src.len() < 8 {
49//             return Ok(None); // Not enough data to read the length
50//         }
51
52//         // Read the u64 length header
53//         let len = src.get_u64() as usize;
54
55//         // Check if enough bytes are available to read the full message
56//         if src.len() < len {
57//             src.reserve(len - src.len()); // Reserve space for the remaining bytes
58//             return Ok(None);
59//         }
60
61//         // Read the actual message bytes of the specified length
62//         let data = src.split_to(len).to_vec();
63//         Ok(Some(data))
64//     }
65// }
66
67// impl Encoder<Vec<u8>> for LengthPrefixedCodec {
68//     type Error = tokio::io::Error;
69
70//     fn encode(&mut self, item: Vec<u8>, dst: &mut BytesMut) -> Result<(), Self::Error> {
71//         // Write the length of the message as a u64 header
72//         dst.put_u64(item.len() as u64);
73
74//         // Write the actual message bytes
75//         dst.put_slice(&item);
76//         Ok(())
77//     }
78// }