use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use bytes::Bytes;
use k256::ecdsa::{RecoveryId, Signature, SigningKey};
use serde::{Deserialize, Serialize};
use xenith_core::{
ChainId, KeyMetadata, MessageId, MessageStatus, MessagingTransport, Result as XResult,
SendOptions, StateKey, StateValue, TransactionSigner, XenithError,
};
use crate::options::encode_executor_lz_receive_option;
const LZ_ENDPOINT_V2: [u8; 20] = [
0x1a, 0x44, 0x07, 0x60, 0x50, 0x12, 0x58, 0x25, 0x90, 0x0e, 0x73, 0x6c, 0x50, 0x1f, 0x85, 0x9c,
0x50, 0xfe, 0x72, 0x8c,
];
#[derive(Serialize)]
struct RpcRequest<'a> {
jsonrpc: &'a str,
method: &'a str,
params: serde_json::Value,
id: u64,
}
#[derive(Deserialize)]
struct RpcResponse {
result: Option<String>,
error: Option<RpcError>,
}
#[derive(Deserialize)]
struct RpcError {
message: String,
}
#[derive(Deserialize)]
struct ScanResponse {
data: Option<Vec<ScanMessage>>,
}
#[derive(Deserialize)]
struct LogEntry {
topics: Vec<String>,
data: String,
#[serde(rename = "blockNumber")]
block_number: String,
}
#[derive(Deserialize)]
struct LogsRpcResponse {
result: Option<Vec<LogEntry>>,
error: Option<RpcError>,
}
#[derive(Deserialize)]
struct ScanMessage {
status: String,
#[serde(rename = "statusDetail")]
status_detail: Option<String>,
}
pub struct K256Signer {
key: SigningKey,
address: [u8; 20],
}
impl K256Signer {
pub fn from_hex(hex: &str) -> Result<Self, XenithError> {
let bytes = alloy_primitives::hex::decode(hex)
.map_err(|e| XenithError::Serialization(format!("invalid private key hex: {e}")))?;
if bytes.len() != 32 {
return Err(XenithError::Serialization(format!(
"private key must be 32 bytes, got {}",
bytes.len()
)));
}
let mut buf = [0u8; 32];
buf.copy_from_slice(&bytes);
Self::from_bytes(&buf)
}
pub fn from_bytes(bytes: &[u8; 32]) -> Result<Self, XenithError> {
let key = SigningKey::try_from(bytes.as_ref())
.map_err(|e| XenithError::Serialization(format!("invalid secp256k1 key: {e}")))?;
let address = derive_address(&key);
Ok(Self { key, address })
}
}
#[async_trait]
impl TransactionSigner for K256Signer {
fn address(&self) -> [u8; 20] {
self.address
}
async fn sign_transaction(
&self,
to: [u8; 20],
calldata: Bytes,
options: &SendOptions,
chain_id: ChainId,
) -> Result<Bytes, XenithError> {
sign_eip155(&self.key, to, &calldata, options, chain_id.0)
}
}
pub struct LayerZeroLiveTransport {
signer: Arc<dyn TransactionSigner>,
endpoint_address: [u8; 20],
chain_id: ChainId,
supported_chains: HashMap<ChainId, u32>,
rpc_url: String,
http_client: reqwest::Client,
tx_hashes: Arc<Mutex<HashMap<MessageId, String>>>,
last_seen_block: Arc<AtomicU64>,
}
impl LayerZeroLiveTransport {
pub fn new(
signer: Arc<dyn TransactionSigner>,
chain_id: ChainId,
rpc_url: String,
chain_mappings: Vec<(ChainId, u32)>,
) -> Self {
Self {
signer,
endpoint_address: LZ_ENDPOINT_V2,
chain_id,
supported_chains: chain_mappings.into_iter().collect(),
rpc_url,
http_client: reqwest::Client::new(),
tx_hashes: Arc::new(Mutex::new(HashMap::new())),
last_seen_block: Arc::new(AtomicU64::new(0)),
}
}
async fn eth_call_endpoint(&self, calldata: &[u8]) -> Result<Vec<u8>, XenithError> {
let data = format!("0x{}", alloy_primitives::hex::encode(calldata));
let to = format!("0x{}", alloy_primitives::hex::encode(self.endpoint_address));
let req = RpcRequest {
jsonrpc: "2.0",
method: "eth_call",
params: serde_json::json!([{"to": to, "data": data}, "latest"]),
id: 1,
};
let resp: RpcResponse = self
.http_client
.post(&self.rpc_url)
.json(&req)
.send()
.await
.map_err(|e| XenithError::Transport {
chain: self.chain_id,
message: e.to_string(),
})?
.json()
.await
.map_err(|e| XenithError::Transport {
chain: self.chain_id,
message: format!("JSON-RPC parse error: {e}"),
})?;
if let Some(err) = resp.error {
return Err(XenithError::Transport {
chain: self.chain_id,
message: format!("eth_call error: {}", err.message),
});
}
let hex_str = resp.result.ok_or_else(|| XenithError::Transport {
chain: self.chain_id,
message: "eth_call returned null result".into(),
})?;
alloy_primitives::hex::decode(hex_str.strip_prefix("0x").unwrap_or(&hex_str)).map_err(|e| {
XenithError::Transport {
chain: self.chain_id,
message: format!("eth_call hex decode: {e}"),
}
})
}
async fn eth_send_raw(&self, raw_tx: &[u8]) -> Result<String, XenithError> {
let hex_tx = format!("0x{}", alloy_primitives::hex::encode(raw_tx));
let req = RpcRequest {
jsonrpc: "2.0",
method: "eth_sendRawTransaction",
params: serde_json::json!([hex_tx]),
id: 1,
};
let resp: RpcResponse = self
.http_client
.post(&self.rpc_url)
.json(&req)
.send()
.await
.map_err(|e| XenithError::Transport {
chain: self.chain_id,
message: e.to_string(),
})?
.json()
.await
.map_err(|e| XenithError::Transport {
chain: self.chain_id,
message: format!("JSON-RPC parse error: {e}"),
})?;
if let Some(err) = resp.error {
return Err(XenithError::Transport {
chain: self.chain_id,
message: format!("eth_sendRawTransaction error: {}", err.message),
});
}
resp.result.ok_or_else(|| XenithError::Transport {
chain: self.chain_id,
message: "eth_sendRawTransaction returned null".into(),
})
}
async fn fetch_scan_status(&self, tx_hash: &str) -> Result<MessageStatus, XenithError> {
let url = format!("https://scan.layerzero-api.com/v1/messages/tx/{tx_hash}");
let resp: ScanResponse = self
.http_client
.get(&url)
.send()
.await
.map_err(|e| XenithError::Transport {
chain: self.chain_id,
message: format!("scan API error: {e}"),
})?
.json()
.await
.map_err(|e| XenithError::Transport {
chain: self.chain_id,
message: format!("scan API parse error: {e}"),
})?;
Ok(match resp.data.and_then(|msgs| msgs.into_iter().next()) {
Some(msg) => match msg.status.as_str() {
"INFLIGHT" => MessageStatus::InFlight,
"DELIVERED" => MessageStatus::Delivered,
"FAILED" => MessageStatus::Failed {
reason: msg.status_detail.unwrap_or_else(|| "unknown".into()),
},
_ => MessageStatus::Pending,
},
None => MessageStatus::Pending,
})
}
async fn eth_get_logs(
&self,
from_block: u64,
topic0: &str,
) -> Result<Vec<LogEntry>, XenithError> {
let from_hex = format!("0x{from_block:x}");
let to_addr = format!("0x{}", alloy_primitives::hex::encode(self.endpoint_address));
let req = RpcRequest {
jsonrpc: "2.0",
method: "eth_getLogs",
params: serde_json::json!([{
"fromBlock": from_hex,
"toBlock": "latest",
"address": to_addr,
"topics": [topic0]
}]),
id: 1,
};
let resp: LogsRpcResponse = self
.http_client
.post(&self.rpc_url)
.json(&req)
.send()
.await
.map_err(|e| XenithError::Transport {
chain: self.chain_id,
message: e.to_string(),
})?
.json()
.await
.map_err(|e| XenithError::Transport {
chain: self.chain_id,
message: format!("JSON-RPC parse error: {e}"),
})?;
if let Some(err) = resp.error {
return Err(XenithError::Transport {
chain: self.chain_id,
message: format!("eth_getLogs error: {}", err.message),
});
}
Ok(resp.result.unwrap_or_default())
}
}
#[async_trait]
impl MessagingTransport for LayerZeroLiveTransport {
async fn send_message(
&self,
destination: ChainId,
payload: Bytes,
options: SendOptions,
) -> XResult<MessageId> {
let &dst_eid = self
.supported_chains
.get(&destination)
.ok_or(XenithError::UnsupportedChain(destination))?;
let receiver = {
let addr = self.signer.address();
let mut buf = [0u8; 32];
buf[12..32].copy_from_slice(&addr);
buf
};
let lz_opts = encode_executor_lz_receive_option(options.gas_limit.unwrap_or(200_000));
let refund = options
.refund_address
.unwrap_or_else(|| self.signer.address());
let calldata = Bytes::from(build_send_calldata(
dst_eid, receiver, &payload, &lz_opts, refund,
));
let fee = self.estimate_fee(destination, payload.clone()).await?;
let tx_opts = SendOptions {
value: Some(fee),
..options
};
let signed_tx = self
.signer
.sign_transaction(self.endpoint_address, calldata, &tx_opts, self.chain_id)
.await?;
let tx_hash = self.eth_send_raw(&signed_tx).await?;
let id = tx_hash_to_message_id(&tx_hash)?;
self.tx_hashes
.lock()
.map_err(|_| XenithError::Transport {
chain: self.chain_id,
message: "tx_hashes mutex poisoned".into(),
})?
.insert(id, tx_hash);
Ok(id)
}
async fn estimate_fee(&self, destination: ChainId, payload: Bytes) -> XResult<u128> {
let &dst_eid = self
.supported_chains
.get(&destination)
.ok_or(XenithError::UnsupportedChain(destination))?;
let receiver = {
let addr = self.signer.address();
let mut buf = [0u8; 32];
buf[12..32].copy_from_slice(&addr);
buf
};
let lz_opts = encode_executor_lz_receive_option(200_000);
let sender = self.signer.address();
let calldata = build_quote_calldata(dst_eid, receiver, &payload, &lz_opts, sender);
let result = self.eth_call_endpoint(&calldata).await?;
if result.len() < 32 {
return Err(XenithError::Transport {
chain: self.chain_id,
message: format!("quote() returned {} bytes, expected ≥ 32", result.len()),
});
}
let native_fee =
u128::from_be_bytes(
result[16..32]
.try_into()
.map_err(|_| XenithError::Transport {
chain: self.chain_id,
message: "quote() fee slice conversion failed".into(),
})?,
);
Ok(native_fee)
}
async fn message_status(&self, id: MessageId) -> XResult<MessageStatus> {
let tx_hash = self
.tx_hashes
.lock()
.map_err(|_| XenithError::Transport {
chain: self.chain_id,
message: "tx_hashes mutex poisoned".into(),
})?
.get(&id)
.cloned()
.ok_or_else(|| XenithError::Transport {
chain: self.chain_id,
message: format!("unknown message ID: {:?}", id),
})?;
self.fetch_scan_status(&tx_hash).await
}
fn sender_address(&self) -> Option<[u8; 20]> {
Some(self.signer.address())
}
async fn poll_incoming(&self) -> XResult<Vec<(StateKey, StateValue, Option<KeyMetadata>)>> {
let topic0 = {
let hash = alloy_primitives::keccak256(b"PacketReceived(bytes,bytes,bytes32)");
format!("0x{}", alloy_primitives::hex::encode(hash.as_slice()))
};
let from_block = self.last_seen_block.load(Ordering::Relaxed);
let logs = match self.eth_get_logs(from_block, &topic0).await {
Ok(logs) => logs,
Err(_) => return Ok(vec![]),
};
let mut results = Vec::new();
let mut max_block = from_block;
for log in &logs {
if log.topics.first().map(|t| t.as_str()) != Some(&topic0) {
continue;
}
let block_num = u64::from_str_radix(
log.block_number
.strip_prefix("0x")
.unwrap_or(&log.block_number),
16,
)
.unwrap_or(0);
if block_num > max_block {
max_block = block_num;
}
let data_hex = log.data.strip_prefix("0x").unwrap_or(&log.data);
let data = match alloy_primitives::hex::decode(data_hex) {
Ok(d) => d,
Err(_) => continue,
};
if let Some(payload) = decode_packet_received_payload(&data) {
if let Ok((key, value, meta)) = xenith_core::wire::decode(&payload) {
results.push((key, value, meta));
}
}
}
if !logs.is_empty() {
self.last_seen_block.store(max_block + 1, Ordering::Relaxed);
}
Ok(results)
}
}
pub(crate) fn build_quote_calldata(
dst_eid: u32,
receiver: [u8; 32],
message: &[u8],
options: &[u8],
sender: [u8; 20],
) -> Vec<u8> {
let selector = fn_selector(b"quote((uint32,bytes32,bytes,bytes,bool),address)");
abi_encode_call(selector, dst_eid, receiver, message, options, false, sender)
}
pub(crate) fn build_send_calldata(
dst_eid: u32,
receiver: [u8; 32],
message: &[u8],
options: &[u8],
refund: [u8; 20],
) -> Vec<u8> {
let selector = fn_selector(b"send((uint32,bytes32,bytes,bytes,bool),address)");
abi_encode_call(selector, dst_eid, receiver, message, options, false, refund)
}
fn fn_selector(sig: &[u8]) -> [u8; 4] {
let hash = alloy_primitives::keccak256(sig);
[hash[0], hash[1], hash[2], hash[3]]
}
fn abi_encode_call(
selector: [u8; 4],
dst_eid: u32,
receiver: [u8; 32],
message: &[u8],
options: &[u8],
pay_in_lz_token: bool,
addr: [u8; 20],
) -> Vec<u8> {
let mut out = Vec::with_capacity(4 + 64 + 256);
out.extend_from_slice(&selector);
out.extend_from_slice(&pad_u64_to_32(64));
out.extend_from_slice(&pad_addr_to_32(addr));
let msg_padded = pad_to_32_multiple(message.len());
let msg_offset: u64 = 5 * 32; let opts_offset: u64 = msg_offset + 32 + msg_padded as u64;
out.extend_from_slice(&pad_u32_to_32(dst_eid));
out.extend_from_slice(&receiver);
out.extend_from_slice(&pad_u64_to_32(msg_offset));
out.extend_from_slice(&pad_u64_to_32(opts_offset));
out.extend_from_slice(&pad_bool_to_32(pay_in_lz_token));
out.extend_from_slice(&pad_u64_to_32(message.len() as u64));
out.extend_from_slice(message);
out.resize(out.len() + msg_padded - message.len(), 0);
let opts_padded = pad_to_32_multiple(options.len());
out.extend_from_slice(&pad_u64_to_32(options.len() as u64));
out.extend_from_slice(options);
out.resize(out.len() + opts_padded - options.len(), 0);
out
}
fn pad_to_32_multiple(n: usize) -> usize {
if n == 0 {
0
} else {
n.div_ceil(32) * 32
}
}
fn pad_u32_to_32(v: u32) -> [u8; 32] {
let mut buf = [0u8; 32];
buf[28..32].copy_from_slice(&v.to_be_bytes());
buf
}
fn pad_u64_to_32(v: u64) -> [u8; 32] {
let mut buf = [0u8; 32];
buf[24..32].copy_from_slice(&v.to_be_bytes());
buf
}
fn pad_addr_to_32(a: [u8; 20]) -> [u8; 32] {
let mut buf = [0u8; 32];
buf[12..32].copy_from_slice(&a);
buf
}
fn pad_bool_to_32(b: bool) -> [u8; 32] {
let mut buf = [0u8; 32];
if b {
buf[31] = 1;
}
buf
}
pub(crate) fn tx_hash_to_message_id(hash_hex: &str) -> Result<MessageId, XenithError> {
let hex = hash_hex.strip_prefix("0x").unwrap_or(hash_hex);
let bytes = alloy_primitives::hex::decode(hex).map_err(|e| XenithError::Transport {
chain: ChainId(0),
message: format!("invalid tx hash hex: {e}"),
})?;
let id_bytes: [u8; 8] = bytes
.get(0..8)
.ok_or_else(|| XenithError::Transport {
chain: ChainId(0),
message: format!("tx hash too short: {} bytes", bytes.len()),
})?
.try_into()
.map_err(|_| XenithError::Transport {
chain: ChainId(0),
message: "tx hash slice error".into(),
})?;
Ok(MessageId::from(u64::from_be_bytes(id_bytes)))
}
fn derive_address(key: &SigningKey) -> [u8; 20] {
let encoded = key.verifying_key().to_encoded_point(false); let pub_bytes = &encoded.as_bytes()[1..]; let hash = alloy_primitives::keccak256(pub_bytes);
let mut addr = [0u8; 20];
addr.copy_from_slice(&hash.as_slice()[12..]);
addr
}
fn sign_eip155(
key: &SigningKey,
to: [u8; 20],
data: &[u8],
options: &SendOptions,
chain_id: u64,
) -> Result<Bytes, XenithError> {
let nonce = options.nonce.unwrap_or(0);
let gas_price = options.max_fee_per_gas.unwrap_or(0);
let gas_limit = options.gas_limit.unwrap_or(200_000);
let value = options.value.unwrap_or(0);
let mut s = rlp::RlpStream::new_list(9);
s.append(&nonce);
rlp_append_u128(&mut s, gas_price);
s.append(&gas_limit);
s.append(&to.to_vec());
rlp_append_u128(&mut s, value);
s.append(&data.to_vec());
s.append(&chain_id);
s.append(&0u64);
s.append(&0u64);
let unsigned_rlp = s.out();
let hash = alloy_primitives::keccak256(&unsigned_rlp[..]);
let (sig, rec_id): (Signature, RecoveryId) = key
.sign_prehash_recoverable(hash.as_ref())
.map_err(|e| XenithError::Serialization(format!("signing failed: {e}")))?;
let r = sig.r().to_bytes();
let s_bytes = sig.s().to_bytes();
let v: u64 = rec_id.to_byte() as u64 + 35 + chain_id * 2;
let mut stream = rlp::RlpStream::new_list(9);
stream.append(&nonce);
rlp_append_u128(&mut stream, gas_price);
stream.append(&gas_limit);
stream.append(&to.to_vec());
rlp_append_u128(&mut stream, value);
stream.append(&data.to_vec());
stream.append(&v);
rlp_append_be_bytes(&mut stream, r.as_ref());
rlp_append_be_bytes(&mut stream, s_bytes.as_ref());
Ok(Bytes::from(stream.out().to_vec()))
}
fn rlp_append_u128(s: &mut rlp::RlpStream, v: u128) {
let bytes = v.to_be_bytes();
let skip = bytes.iter().position(|&b| b != 0).unwrap_or(16);
s.append(&bytes[skip..].to_vec());
}
fn rlp_append_be_bytes(s: &mut rlp::RlpStream, bytes: &[u8]) {
let skip = bytes.iter().position(|&b| b != 0).unwrap_or(bytes.len());
s.append(&bytes[skip..].to_vec());
}
fn decode_packet_received_payload(data: &[u8]) -> Option<bytes::Bytes> {
if data.len() < 96 {
return None;
}
let offset1 = u64::from_be_bytes(data[56..64].try_into().ok()?) as usize;
if data.len() < offset1 + 32 {
return None;
}
let payload_len =
u64::from_be_bytes(data[offset1 + 24..offset1 + 32].try_into().ok()?) as usize;
if data.len() < offset1 + 32 + payload_len {
return None;
}
Some(bytes::Bytes::copy_from_slice(
&data[offset1 + 32..offset1 + 32 + payload_len],
))
}
#[cfg(test)]
mod tests {
use super::*;
use xenith_core::SendOptions;
#[test]
fn test_fee_estimation_calldata() {
let calldata = build_quote_calldata(30110, [0u8; 32], &[], &[], [0u8; 20]);
let expected_selector = fn_selector(b"quote((uint32,bytes32,bytes,bytes,bool),address)");
assert_eq!(
&calldata[0..4],
&expected_selector,
"quote() calldata must start with the correct 4-byte selector"
);
}
#[test]
fn test_send_message_calldata() {
let calldata = build_send_calldata(30110, [0u8; 32], &[], &[], [0u8; 20]);
let expected_selector = fn_selector(b"send((uint32,bytes32,bytes,bytes,bool),address)");
assert_eq!(
&calldata[0..4],
&expected_selector,
"send() calldata must start with the correct 4-byte selector"
);
}
#[test]
fn test_message_id_derivation() {
let hash = "0x0102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f20";
let id = tx_hash_to_message_id(hash).unwrap();
assert_eq!(id, MessageId::from(0x0102030405060708u64));
}
#[tokio::test]
async fn test_sign_eip155_known_vector() {
let signer = K256Signer::from_hex(
"4646464646464646464646464646464646464646464646464646464646464646",
)
.unwrap();
let to = [0x35u8; 20];
let opts = SendOptions {
nonce: Some(9),
max_fee_per_gas: Some(20_000_000_000),
gas_limit: Some(21_000),
value: Some(1_000_000_000_000_000_000),
..Default::default()
};
let signed = signer
.sign_transaction(to, Bytes::new(), &opts, ChainId(1))
.await
.unwrap();
let expected_hex = concat!(
"f86c",
"09",
"8504a817c800",
"825208",
"943535353535353535353535353535353535353535",
"880de0b6b3a7640000",
"80",
"25",
"a028ef61340bd939bc2195fe537567866003e1a15d3c71ff63e1590620aa636276",
"a067cbe9d8997f761aecb703304b3800ccf555c9f3dc64214b297fb1966a3b6d83"
);
let expected = alloy_primitives::hex::decode(expected_hex).unwrap();
assert_eq!(
signed.as_ref(),
expected.as_slice(),
"signed tx must match EIP-155 test vector"
);
}
}