#![allow(dead_code)]
use tokio::net::TcpStream;
use tokio_util::codec::{Framed, LengthDelimitedCodec};
pub const MAX_FRAME_BYTES: usize = 100 * 1024 * 1024;
#[must_use]
pub fn codec() -> LengthDelimitedCodec {
LengthDelimitedCodec::builder()
.length_field_offset(0)
.length_field_length(4)
.length_field_type::<u32>()
.max_frame_length(MAX_FRAME_BYTES)
.big_endian()
.new_codec()
}
#[must_use]
pub fn frame(stream: TcpStream) -> Framed<TcpStream, LengthDelimitedCodec> {
Framed::new(stream, codec())
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use bytes::Bytes;
use futures_util::{SinkExt, StreamExt};
use tokio::io::AsyncWriteExt;
use tokio::net::{TcpListener, TcpStream};
#[tokio::test]
async fn roundtrips_a_frame() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let server = tokio::spawn(async move {
let (stream, _) = listener.accept().await.unwrap();
let mut framed = frame(stream);
framed.next().await.unwrap().unwrap().freeze()
});
let client = TcpStream::connect(addr).await.unwrap();
let mut framed = frame(client);
framed
.send(Bytes::from_static(b"hello broker"))
.await
.unwrap();
framed.into_inner().shutdown().await.unwrap();
let received = server.await.unwrap();
assert!(received.as_ref() == b"hello broker");
}
}