d_engine_core/test_utils/
common.rs

1use std::ops::RangeInclusive;
2use std::path::PathBuf;
3use std::time::Duration;
4
5use bytes::Bytes;
6use bytes::BytesMut;
7use d_engine_proto::client::WriteCommand;
8use d_engine_proto::common::AddNode;
9use d_engine_proto::common::Entry;
10use d_engine_proto::common::EntryPayload;
11use d_engine_proto::common::membership_change::Change;
12use prost::Message;
13
14use crate::SnapshotConfig;
15use crate::convert::safe_kv_bytes;
16
17pub fn create_mixed_entries() -> Vec<Entry> {
18    let config_entry = Entry {
19        index: 1,
20        term: 1,
21        payload: Some(EntryPayload::config(Change::AddNode(AddNode {
22            node_id: 7,
23            address: "127.0.0.1:8080".into(),
24            status: d_engine_proto::common::NodeStatus::Promotable as i32,
25        }))),
26    };
27
28    let app_entry = Entry {
29        index: 2,
30        term: 1,
31        payload: Some(EntryPayload::command(generate_insert_commands(vec![1]))),
32    };
33
34    vec![config_entry, app_entry]
35}
36
37pub fn create_config_entries() -> Vec<Entry> {
38    let entry = Entry {
39        index: 1,
40        term: 1,
41        payload: Some(EntryPayload::config(Change::AddNode(AddNode {
42            node_id: 8,
43            address: "127.0.0.1:8080".into(),
44            status: d_engine_proto::common::NodeStatus::Promotable as i32,
45        }))),
46    };
47    vec![entry]
48}
49
50pub fn generate_insert_commands(ids: Vec<u64>) -> Bytes {
51    let mut buffer = BytesMut::new();
52
53    for id in ids {
54        let cmd = WriteCommand::insert(safe_kv_bytes(id), safe_kv_bytes(id));
55        cmd.encode(&mut buffer).expect("Failed to encode insert command");
56    }
57
58    buffer.freeze()
59}
60
61pub fn generate_delete_commands(range: RangeInclusive<u64>) -> Bytes {
62    let mut buffer = BytesMut::new();
63
64    for id in range {
65        let cmd = WriteCommand::delete(safe_kv_bytes(id));
66        cmd.encode(&mut buffer).expect("Failed to encode delete command");
67    }
68
69    buffer.freeze()
70}
71
72pub fn snapshot_config(snapshots_dir: PathBuf) -> SnapshotConfig {
73    SnapshotConfig {
74        max_log_entries_before_snapshot: 1,
75        snapshot_cool_down_since_last_check: Duration::from_secs(0),
76        cleanup_retain_count: 2,
77        snapshots_dir,
78        chunk_size: 1024,
79        retained_log_entries: 1,
80        sender_yield_every_n_chunks: 1,
81        receiver_yield_every_n_chunks: 1,
82        max_bandwidth_mbps: 1,
83        push_queue_size: 1,
84        cache_size: 1,
85        max_retries: 1,
86        transfer_timeout_in_sec: 1,
87        retry_interval_in_ms: 1,
88        snapshot_push_backoff_in_ms: 1,
89        snapshot_push_max_retry: 1,
90        push_timeout_in_ms: 100,
91        enable: true,
92        snapshots_dir_prefix: "snapshot-".to_string(),
93    }
94}