#![allow(dead_code, unreachable_pub)]
use std::sync::Arc;
use std::time::Instant;
use bytes::BytesMut;
use magnetar_proto::{
ConnectionConfig, CreateProducerRequest, ProducerHandle, SequenceId, encode_command, pb,
};
use magnetar_runtime_moonpool::ConnectionShared;
pub fn sweep_seeds(count: usize) -> Vec<u64> {
let base = std::env::var("MOONPOOL_SEED")
.ok()
.and_then(|s| {
let s = s.trim();
if let Some(hex) = s.strip_prefix("0x").or_else(|| s.strip_prefix("0X")) {
u64::from_str_radix(hex, 16).ok()
} else {
s.parse::<u64>().ok()
}
})
.unwrap_or(0x4242_4242_4242_4242_u64);
let mut x = base;
(0..count)
.map(|_| {
x = x.wrapping_add(0x9E37_79B9_7F4A_7C15);
let mut z = x;
z = (z ^ (z >> 30)).wrapping_mul(0xBF58_476D_1CE4_E5B9);
z = (z ^ (z >> 27)).wrapping_mul(0x94D0_49BB_1331_11EB);
z ^ (z >> 31)
})
.collect()
}
pub fn handshake_response_bytes() -> BytesMut {
let cmd = pb::BaseCommand {
r#type: pb::base_command::Type::Connected as i32,
connected: Some(pb::CommandConnected {
server_version: "magnetar-test".to_owned(),
protocol_version: Some(21),
max_message_size: Some(5 * 1024 * 1024),
feature_flags: Some(pb::FeatureFlags::default()),
}),
..Default::default()
};
let mut buf = BytesMut::new();
encode_command(&mut buf, &cmd).expect("encode CommandConnected");
buf
}
pub fn handshake_complete_shared(at: Instant) -> Arc<ConnectionShared> {
handshake_complete_shared_with_config(at, ConnectionConfig::default())
}
pub fn handshake_complete_shared_with_config(
at: Instant,
config: ConnectionConfig,
) -> Arc<ConnectionShared> {
let shared = ConnectionShared::new(config);
{
let mut conn = shared.inner.lock();
conn.begin_handshake().expect("handshake");
let frame = handshake_response_bytes();
conn.handle_bytes(at, &frame).expect("connected");
let _ = conn.poll_event();
}
shared
}
pub fn open_producer_ready(
shared: &Arc<ConnectionShared>,
topic: &str,
at: Instant,
) -> ProducerHandle {
let req = CreateProducerRequest {
topic: topic.to_owned(),
..Default::default()
};
let (handle, request_id) = {
let mut conn = shared.inner.lock();
let request_id = conn.peek_next_request_id_for_test();
let handle = conn.create_producer(req);
(handle, request_id)
};
let success = pb::BaseCommand {
r#type: pb::base_command::Type::ProducerSuccess as i32,
producer_success: Some(pb::CommandProducerSuccess {
request_id,
producer_name: format!("magnetar-test-{}", handle.0),
last_sequence_id: Some(-1),
schema_version: None,
topic_epoch: None,
producer_ready: Some(true),
}),
..Default::default()
};
let mut buf = BytesMut::new();
encode_command(&mut buf, &success).expect("encode CommandProducerSuccess");
{
let mut conn = shared.inner.lock();
conn.handle_bytes(at, &buf).expect("apply ProducerSuccess");
let _ = conn.poll_event();
}
handle
}
pub fn send_receipt_bytes(
producer_handle: ProducerHandle,
sequence_id: SequenceId,
ledger_id: u64,
entry_id: u64,
) -> BytesMut {
let cmd = pb::BaseCommand {
r#type: pb::base_command::Type::SendReceipt as i32,
send_receipt: Some(pb::CommandSendReceipt {
producer_id: producer_handle.0,
sequence_id: sequence_id.0,
message_id: Some(pb::MessageIdData {
ledger_id,
entry_id,
partition: None,
batch_index: None,
ack_set: vec![],
batch_size: None,
first_chunk_message_id: None,
}),
highest_sequence_id: None,
}),
..Default::default()
};
let mut buf = BytesMut::new();
encode_command(&mut buf, &cmd).expect("encode CommandSendReceipt");
buf
}
pub fn new_txn_response_bytes(request_id: u64, txn_most: u64, txn_least: u64) -> BytesMut {
let cmd = pb::BaseCommand {
r#type: pb::base_command::Type::NewTxnResponse as i32,
new_txn_response: Some(pb::CommandNewTxnResponse {
request_id,
txnid_most_bits: Some(txn_most),
txnid_least_bits: Some(txn_least),
error: None,
message: None,
}),
..Default::default()
};
let mut buf = BytesMut::new();
encode_command(&mut buf, &cmd).expect("encode CommandNewTxnResponse");
buf
}
pub fn add_partition_to_txn_response_bytes(request_id: u64) -> BytesMut {
let cmd = pb::BaseCommand {
r#type: pb::base_command::Type::AddPartitionToTxnResponse as i32,
add_partition_to_txn_response: Some(pb::CommandAddPartitionToTxnResponse {
request_id,
txnid_most_bits: None,
txnid_least_bits: None,
error: None,
message: None,
}),
..Default::default()
};
let mut buf = BytesMut::new();
encode_command(&mut buf, &cmd).expect("encode CommandAddPartitionToTxnResponse");
buf
}
pub fn add_subscription_to_txn_response_bytes(request_id: u64) -> BytesMut {
let cmd = pb::BaseCommand {
r#type: pb::base_command::Type::AddSubscriptionToTxnResponse as i32,
add_subscription_to_txn_response: Some(pb::CommandAddSubscriptionToTxnResponse {
request_id,
txnid_most_bits: None,
txnid_least_bits: None,
error: None,
message: None,
}),
..Default::default()
};
let mut buf = BytesMut::new();
encode_command(&mut buf, &cmd).expect("encode CommandAddSubscriptionToTxnResponse");
buf
}
pub fn end_txn_response_bytes(request_id: u64) -> BytesMut {
let cmd = pb::BaseCommand {
r#type: pb::base_command::Type::EndTxnResponse as i32,
end_txn_response: Some(pb::CommandEndTxnResponse {
request_id,
txnid_most_bits: None,
txnid_least_bits: None,
error: None,
message: None,
}),
..Default::default()
};
let mut buf = BytesMut::new();
encode_command(&mut buf, &cmd).expect("encode CommandEndTxnResponse");
buf
}
pub fn topic_migrated_bytes(
producer_handle: ProducerHandle,
new_url: Option<&str>,
new_url_tls: Option<&str>,
) -> BytesMut {
let cmd = pb::BaseCommand {
r#type: pb::base_command::Type::TopicMigrated as i32,
topic_migrated: Some(pb::CommandTopicMigrated {
resource_id: producer_handle.0,
resource_type: pb::command_topic_migrated::ResourceType::Producer as i32,
broker_service_url: new_url.map(str::to_owned),
broker_service_url_tls: new_url_tls.map(str::to_owned),
}),
..Default::default()
};
let mut buf = BytesMut::new();
encode_command(&mut buf, &cmd).expect("encode CommandTopicMigrated");
buf
}