use std::sync::Arc;
use prost::Message;
use tokio::sync::mpsc;
use tokio_stream::StreamExt;
use crate::stream::create_production_snapshot_stream;
use crate::stream::encode_varint;
use crate::stream::encoded_len_varint;
#[derive(Clone, PartialEq, Message)]
pub struct TestMessage {
#[prost(string, tag = "1")]
pub data: String,
}
#[tokio::test]
async fn test_create_production_snapshot_stream() {
let (tx, rx) = mpsc::channel(10);
let max_message_size = 1024 * 1024;
let mut stream = create_production_snapshot_stream::<TestMessage>(rx, max_message_size);
let test_msg = TestMessage {
data: "test".to_string(),
};
tx.send(Ok(Arc::new(test_msg))).await.unwrap();
drop(tx);
let received = stream.next().await;
assert!(received.is_some());
let result = received.unwrap();
assert!(result.is_ok());
let msg = result.unwrap();
assert_eq!(msg.data, "test");
assert!(stream.next().await.is_none());
}
#[test]
fn test_encoded_len_varint() {
assert_eq!(encoded_len_varint(0), 1);
assert_eq!(encoded_len_varint(127), 1);
assert_eq!(encoded_len_varint(128), 2);
assert_eq!(encoded_len_varint(16383), 2);
assert_eq!(encoded_len_varint(16384), 3);
}
#[test]
fn test_encode_varint() {
let mut buf = Vec::new();
encode_varint(127, &mut buf);
assert_eq!(buf, vec![127]);
buf.clear();
encode_varint(128, &mut buf);
assert_eq!(buf, vec![0x80, 0x01]);
buf.clear();
encode_varint(16384, &mut buf);
assert_eq!(buf, vec![0x80, 0x80, 0x01]);
}