d_engine_core/test_utils/
snapshot.rs

1use std::path::Path;
2
3use bytes::BufMut;
4use bytes::Bytes;
5use bytes::BytesMut;
6use crc32fast::Hasher;
7use d_engine_proto::common::LogId;
8use d_engine_proto::server::storage::SnapshotChunk;
9use d_engine_proto::server::storage::SnapshotMetadata;
10use futures::StreamExt;
11use futures::TryStreamExt;
12use futures::stream;
13use http_body::Frame;
14use http_body_util::BodyExt;
15use http_body_util::StreamBody;
16use tokio::fs::File;
17use tokio::io::AsyncWriteExt;
18use tokio::sync::mpsc;
19use tokio_stream::wrappers::ReceiverStream;
20use tonic::Code;
21use tonic::Status;
22use tracing::debug;
23
24use crate::stream::GrpcStreamDecoder;
25
26pub fn create_test_snapshot_stream<T>(chunks: Vec<T>) -> tonic::Streaming<T>
27where
28    T: prost::Message + Default + 'static,
29{
30    // Convert chunks to encoded byte streams
31    let byte_stream = stream::iter(chunks.into_iter().map(|chunk| {
32        let mut buf = Vec::new();
33
34        chunk
35            .encode(&mut buf)
36            .map_err(|e| Status::new(Code::Internal, format!("Encoding failed: {e}")))?;
37
38        // Add Tonic frame header
39        let mut frame = BytesMut::new();
40        frame.put_u8(0); // No compression
41        debug!("buf.len()={}", buf.len());
42
43        frame.put_u32(buf.len() as u32); // Message length
44        frame.extend_from_slice(&buf);
45
46        Ok(frame.freeze())
47    }));
48
49    let body = StreamBody::new(
50        byte_stream
51            .map_ok(Frame::data)
52            .map_err(|e: Status| Status::new(Code::Internal, format!("Stream error: {e}"))),
53    );
54    tonic::Streaming::new_request(
55        GrpcStreamDecoder::<T>::new(),
56        body.boxed_unsync(),
57        None,
58        Some(1024 * 1024 * 1024),
59    )
60}
61
62pub async fn create_test_compressed_snapshot() -> (Vec<u8>, SnapshotMetadata) {
63    // Create a temporary directory for our test data
64    let temp_dir = tempfile::tempdir().unwrap();
65    let temp_path = temp_dir.path();
66
67    // Create metadata
68    let metadata = SnapshotMetadata {
69        last_included: Some(LogId { index: 5, term: 1 }),
70        checksum: Bytes::from(vec![1; 32]),
71    };
72
73    // Create test data file
74    let data_file = temp_path.join("test_data.bin");
75    tokio::fs::write(&data_file, b"test snapshot content").await.unwrap();
76
77    // Create metadata file
78    let metadata_bytes = bincode::serialize(&metadata).unwrap();
79    tokio::fs::write(temp_path.join("metadata.bin"), &metadata_bytes).await.unwrap();
80
81    // Create compressed file
82    let compressed_path = temp_path.join("snapshot.tar.gz");
83    let file = tokio::fs::File::create(&compressed_path).await.unwrap();
84    let gzip_encoder = async_compression::tokio::write::GzipEncoder::new(file);
85    let mut tar_builder = tokio_tar::Builder::new(gzip_encoder);
86
87    // Add files to tar
88    tar_builder.append_path_with_name(&data_file, "test_data.bin").await.unwrap();
89    tar_builder
90        .append_path_with_name(temp_path.join("metadata.bin"), "metadata.bin")
91        .await
92        .unwrap();
93
94    // Finish compression
95    tar_builder.finish().await.unwrap();
96    let mut gzip_encoder = tar_builder.into_inner().await.unwrap();
97    gzip_encoder.shutdown().await.unwrap();
98
99    // Read compressed data back
100    let compressed_data = tokio::fs::read(&compressed_path).await.unwrap();
101
102    (compressed_data, metadata)
103}
104
105#[allow(unused)]
106pub(crate) fn create_test_snapshot_stream_from_receiver<T>(
107    receiver: mpsc::Receiver<T>
108) -> tonic::Streaming<T>
109where
110    T: prost::Message + Default + 'static,
111{
112    let byte_stream = ReceiverStream::new(receiver).map(|item| {
113        let mut buf = Vec::new();
114        item.encode(&mut buf)
115            .map_err(|e| Status::new(Code::Internal, format!("Encoding failed: {e}")))?;
116
117        let mut frame = BytesMut::new();
118        frame.put_u8(0);
119        frame.put_u32(buf.len() as u32);
120        frame.extend_from_slice(&buf);
121        Ok(frame.freeze())
122    });
123
124    let body = StreamBody::new(
125        byte_stream
126            .map_ok(Frame::data)
127            .map_err(|e: Status| Status::new(Code::Internal, format!("Stream error: {e}"))),
128    );
129
130    tonic::Streaming::new_request(
131        GrpcStreamDecoder::<T>::new(),
132        body.boxed_unsync(),
133        None,
134        Some(1024 * 1024 * 1024),
135    )
136}
137
138/// Helper to create valid test chunk
139pub fn create_test_chunk(
140    seq: u32,
141    data: &[u8],
142    leader_term: u64,
143    leader_id: u32,
144    total_chunks: u32,
145) -> SnapshotChunk {
146    SnapshotChunk {
147        leader_term,
148        leader_id,
149        seq,
150        total_chunks,
151        chunk_checksum: Bytes::from(compute_checksum(data)),
152        metadata: Some(SnapshotMetadata {
153            last_included: Some(LogId {
154                index: 100,
155                term: leader_term,
156            }),
157            checksum: Bytes::new(),
158        }),
159        data: Bytes::from(data.to_vec()),
160    }
161}
162
163/// Helper to compute CRC32 checksum for test data
164fn compute_checksum(data: &[u8]) -> Vec<u8> {
165    let mut hasher = Hasher::new();
166    hasher.update(data);
167    hasher.finalize().to_be_bytes().to_vec()
168}
169
170/// Creates a fake compressed snapshot file for testing
171#[allow(unused)]
172pub async fn create_fake_compressed_snapshot(
173    path: &Path,
174    content: &[u8],
175) -> std::result::Result<(), Box<dyn std::error::Error>> {
176    use async_compression::tokio::write::GzipEncoder;
177    use tokio::io::AsyncWriteExt;
178
179    let file = File::create(path).await?;
180    let mut encoder = GzipEncoder::new(file);
181    encoder.write_all(content).await?;
182    encoder.shutdown().await?;
183    Ok(())
184}
185
186/// Creates a fake compressed snapshot with directory structure
187#[allow(unused)]
188pub async fn create_fake_dir_compressed_snapshot(
189    path: &Path,
190    files: &[(&str, &[u8])],
191) -> std::result::Result<(), Box<dyn std::error::Error>> {
192    use async_compression::tokio::write::GzipEncoder;
193    use tokio_tar::Builder;
194
195    let file = File::create(path).await?;
196    let gzip_encoder = GzipEncoder::new(file);
197    let mut tar_builder = Builder::new(gzip_encoder);
198
199    let temp_dir = tempfile::tempdir()?;
200    for (file_name, content) in files {
201        let file_path = temp_dir.path().join(file_name);
202        tokio::fs::write(&file_path, content).await?;
203        tar_builder.append_path(&file_path).await?;
204    }
205
206    tar_builder.finish().await?;
207    let mut gzip_encoder = tar_builder.into_inner().await?;
208    gzip_encoder.shutdown().await?;
209    Ok(())
210}