d_engine_core/test_utils/
snapshot.rs1use std::path::Path;
2
3use bytes::BufMut;
4use bytes::Bytes;
5use bytes::BytesMut;
6use crc32fast::Hasher;
7use d_engine_proto::common::LogId;
8use d_engine_proto::server::storage::SnapshotChunk;
9use d_engine_proto::server::storage::SnapshotMetadata;
10use futures::StreamExt;
11use futures::TryStreamExt;
12use futures::stream;
13use http_body::Frame;
14use http_body_util::BodyExt;
15use http_body_util::StreamBody;
16use tokio::fs::File;
17use tokio::io::AsyncWriteExt;
18use tokio::sync::mpsc;
19use tokio_stream::wrappers::ReceiverStream;
20use tonic::Code;
21use tonic::Status;
22use tracing::debug;
23
24use crate::stream::GrpcStreamDecoder;
25
26pub fn create_test_snapshot_stream<T>(chunks: Vec<T>) -> tonic::Streaming<T>
27where
28 T: prost::Message + Default + 'static,
29{
30 let byte_stream = stream::iter(chunks.into_iter().map(|chunk| {
32 let mut buf = Vec::new();
33
34 chunk
35 .encode(&mut buf)
36 .map_err(|e| Status::new(Code::Internal, format!("Encoding failed: {e}")))?;
37
38 let mut frame = BytesMut::new();
40 frame.put_u8(0); debug!("buf.len()={}", buf.len());
42
43 frame.put_u32(buf.len() as u32); frame.extend_from_slice(&buf);
45
46 Ok(frame.freeze())
47 }));
48
49 let body = StreamBody::new(
50 byte_stream
51 .map_ok(Frame::data)
52 .map_err(|e: Status| Status::new(Code::Internal, format!("Stream error: {e}"))),
53 );
54 tonic::Streaming::new_request(
55 GrpcStreamDecoder::<T>::new(),
56 body.boxed_unsync(),
57 None,
58 Some(1024 * 1024 * 1024),
59 )
60}
61
62pub async fn create_test_compressed_snapshot() -> (Vec<u8>, SnapshotMetadata) {
63 let temp_dir = tempfile::tempdir().unwrap();
65 let temp_path = temp_dir.path();
66
67 let metadata = SnapshotMetadata {
69 last_included: Some(LogId { index: 5, term: 1 }),
70 checksum: Bytes::from(vec![1; 32]),
71 };
72
73 let data_file = temp_path.join("test_data.bin");
75 tokio::fs::write(&data_file, b"test snapshot content").await.unwrap();
76
77 let metadata_bytes = bincode::serialize(&metadata).unwrap();
79 tokio::fs::write(temp_path.join("metadata.bin"), &metadata_bytes).await.unwrap();
80
81 let compressed_path = temp_path.join("snapshot.tar.gz");
83 let file = tokio::fs::File::create(&compressed_path).await.unwrap();
84 let gzip_encoder = async_compression::tokio::write::GzipEncoder::new(file);
85 let mut tar_builder = tokio_tar::Builder::new(gzip_encoder);
86
87 tar_builder.append_path_with_name(&data_file, "test_data.bin").await.unwrap();
89 tar_builder
90 .append_path_with_name(temp_path.join("metadata.bin"), "metadata.bin")
91 .await
92 .unwrap();
93
94 tar_builder.finish().await.unwrap();
96 let mut gzip_encoder = tar_builder.into_inner().await.unwrap();
97 gzip_encoder.shutdown().await.unwrap();
98
99 let compressed_data = tokio::fs::read(&compressed_path).await.unwrap();
101
102 (compressed_data, metadata)
103}
104
105#[allow(unused)]
106pub(crate) fn create_test_snapshot_stream_from_receiver<T>(
107 receiver: mpsc::Receiver<T>
108) -> tonic::Streaming<T>
109where
110 T: prost::Message + Default + 'static,
111{
112 let byte_stream = ReceiverStream::new(receiver).map(|item| {
113 let mut buf = Vec::new();
114 item.encode(&mut buf)
115 .map_err(|e| Status::new(Code::Internal, format!("Encoding failed: {e}")))?;
116
117 let mut frame = BytesMut::new();
118 frame.put_u8(0);
119 frame.put_u32(buf.len() as u32);
120 frame.extend_from_slice(&buf);
121 Ok(frame.freeze())
122 });
123
124 let body = StreamBody::new(
125 byte_stream
126 .map_ok(Frame::data)
127 .map_err(|e: Status| Status::new(Code::Internal, format!("Stream error: {e}"))),
128 );
129
130 tonic::Streaming::new_request(
131 GrpcStreamDecoder::<T>::new(),
132 body.boxed_unsync(),
133 None,
134 Some(1024 * 1024 * 1024),
135 )
136}
137
138pub fn create_test_chunk(
140 seq: u32,
141 data: &[u8],
142 leader_term: u64,
143 leader_id: u32,
144 total_chunks: u32,
145) -> SnapshotChunk {
146 SnapshotChunk {
147 leader_term,
148 leader_id,
149 seq,
150 total_chunks,
151 chunk_checksum: Bytes::from(compute_checksum(data)),
152 metadata: Some(SnapshotMetadata {
153 last_included: Some(LogId {
154 index: 100,
155 term: leader_term,
156 }),
157 checksum: Bytes::new(),
158 }),
159 data: Bytes::from(data.to_vec()),
160 }
161}
162
163fn compute_checksum(data: &[u8]) -> Vec<u8> {
165 let mut hasher = Hasher::new();
166 hasher.update(data);
167 hasher.finalize().to_be_bytes().to_vec()
168}
169
170#[allow(unused)]
172pub async fn create_fake_compressed_snapshot(
173 path: &Path,
174 content: &[u8],
175) -> std::result::Result<(), Box<dyn std::error::Error>> {
176 use async_compression::tokio::write::GzipEncoder;
177 use tokio::io::AsyncWriteExt;
178
179 let file = File::create(path).await?;
180 let mut encoder = GzipEncoder::new(file);
181 encoder.write_all(content).await?;
182 encoder.shutdown().await?;
183 Ok(())
184}
185
186#[allow(unused)]
188pub async fn create_fake_dir_compressed_snapshot(
189 path: &Path,
190 files: &[(&str, &[u8])],
191) -> std::result::Result<(), Box<dyn std::error::Error>> {
192 use async_compression::tokio::write::GzipEncoder;
193 use tokio_tar::Builder;
194
195 let file = File::create(path).await?;
196 let gzip_encoder = GzipEncoder::new(file);
197 let mut tar_builder = Builder::new(gzip_encoder);
198
199 let temp_dir = tempfile::tempdir()?;
200 for (file_name, content) in files {
201 let file_path = temp_dir.path().join(file_name);
202 tokio::fs::write(&file_path, content).await?;
203 tar_builder.append_path(&file_path).await?;
204 }
205
206 tar_builder.finish().await?;
207 let mut gzip_encoder = tar_builder.into_inner().await?;
208 gzip_encoder.shutdown().await?;
209 Ok(())
210}