dynamo_runtime/pipeline/network/
codec.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Codec Module
5//!
6//! Codec map structure into blobs of bytes and streams of bytes.
7//!
8//! In this module, we define three primary codec used to issue single, two-part or multi-part messages,
9//! on a byte stream.
10
11use tokio_util::{
12    bytes::{Buf, BufMut, BytesMut},
13    codec::{Decoder, Encoder},
14};
15
16mod two_part;
17
18pub use two_part::{TwoPartCodec, TwoPartMessage, TwoPartMessageType};
19
20// // Custom codec that reads a u64 length header and the message of that length
21// #[derive(Default)]
22// pub struct LengthPrefixedCodec;
23
24// impl LengthPrefixedCodec {
25//     pub fn new() -> Self {
26//         LengthPrefixedCodec {}
27//     }
28// }
29
30// impl Decoder for LengthPrefixedCodec {
31//     type Item = Vec<u8>;
32//     type Error = tokio::io::Error;
33
34//     fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
35//         // Check if enough bytes are available to read the length (u64 = 8 bytes)
36//         if src.len() < 8 {
37//             return Ok(None); // Not enough data to read the length
38//         }
39
40//         // Read the u64 length header
41//         let len = src.get_u64() as usize;
42
43//         // Check if enough bytes are available to read the full message
44//         if src.len() < len {
45//             src.reserve(len - src.len()); // Reserve space for the remaining bytes
46//             return Ok(None);
47//         }
48
49//         // Read the actual message bytes of the specified length
50//         let data = src.split_to(len).to_vec();
51//         Ok(Some(data))
52//     }
53// }
54
55// impl Encoder<Vec<u8>> for LengthPrefixedCodec {
56//     type Error = tokio::io::Error;
57
58//     fn encode(&mut self, item: Vec<u8>, dst: &mut BytesMut) -> Result<(), Self::Error> {
59//         // Write the length of the message as a u64 header
60//         dst.put_u64(item.len() as u64);
61
62//         // Write the actual message bytes
63//         dst.put_slice(&item);
64//         Ok(())
65//     }
66// }