use bytes::{BufMut, BytesMut};
use sha1::{Digest, Sha1};
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tracing::{debug, info};
use crate::YamlBaseError;
use crate::config::Config;
use crate::database::Storage;
use crate::protocol::mysql_caching_sha2::{CACHING_SHA2_PLUGIN_NAME, CachingSha2Auth};
use crate::sql::{QueryExecutor, parse_sql};
const PROTOCOL_VERSION: u8 = 10;
const SERVER_VERSION: &str = "8.0.35-yamlbase";
const AUTH_PLUGIN_NAME: &str = "mysql_native_password";
const COM_QUIT: u8 = 0x01;
const COM_INIT_DB: u8 = 0x02;
const COM_QUERY: u8 = 0x03;
const COM_PING: u8 = 0x0e;
const CLIENT_LONG_PASSWORD: u32 = 0x00000001;
const CLIENT_FOUND_ROWS: u32 = 0x00000002;
const CLIENT_LONG_FLAG: u32 = 0x00000004;
const CLIENT_CONNECT_WITH_DB: u32 = 0x00000008;
const CLIENT_PROTOCOL_41: u32 = 0x00000200;
const CLIENT_SECURE_CONNECTION: u32 = 0x00008000;
const CLIENT_PLUGIN_AUTH: u32 = 0x00080000;
const _CLIENT_DEPRECATE_EOF: u32 = 0x01000000;
const MYSQL_TYPE_VAR_STRING: u8 = 253;
const SERVER_STATUS_AUTOCOMMIT: u16 = 0x0002;
pub struct MySqlProtocol {
config: Arc<Config>,
executor: QueryExecutor,
_database_name: String,
}
struct ConnectionState {
sequence_id: u8,
_capabilities: u32,
auth_data: Vec<u8>,
client_auth_plugin: Option<String>,
}
impl Default for ConnectionState {
fn default() -> Self {
Self {
sequence_id: 0,
_capabilities: 0,
auth_data: generate_auth_data(),
client_auth_plugin: None,
}
}
}
impl MySqlProtocol {
pub async fn new(config: Arc<Config>, storage: Arc<Storage>) -> crate::Result<Self> {
let executor = QueryExecutor::new(storage).await?;
Ok(Self {
config,
executor,
_database_name: String::new(), })
}
pub async fn handle_connection(&self, mut stream: TcpStream) -> crate::Result<()> {
info!("New MySQL connection");
let mut state = ConnectionState::default();
self.send_handshake(&mut stream, &mut state).await?;
let response_packet = self.read_packet(&mut stream, &mut state).await?;
let (username, auth_response, _database, client_plugin) =
self.parse_handshake_response(&response_packet)?;
state.client_auth_plugin = client_plugin;
debug!(
"Authentication check - username: {}, expected: {}",
username, self.config.username
);
if username != self.config.username {
debug!("Username mismatch");
self.send_error(&mut stream, &mut state, 1045, "28000", "Access denied")
.await?;
return Ok(());
}
let expected = compute_auth_response(&self.config.password, &state.auth_data);
debug!(
"Password check - auth_response len: {}, expected len: {}, config password: {}",
auth_response.len(),
expected.len(),
self.config.password
);
let client_wants_caching = state
.client_auth_plugin
.as_ref()
.map(|p| p == CACHING_SHA2_PLUGIN_NAME)
.unwrap_or(false);
if client_wants_caching || auth_response.is_empty() {
debug!("Client requested caching_sha2_password or sent empty auth");
let caching_auth_data = generate_auth_data();
let caching_auth = CachingSha2Auth::new(caching_auth_data.clone());
caching_auth
.send_auth_switch_request(&mut stream, &mut state.sequence_id)
.await?;
let auth_switch_response = self.read_packet(&mut stream, &mut state).await?;
let auth_success = caching_auth
.authenticate(
&mut stream,
&mut state.sequence_id,
&username,
"", &self.config.username,
&self.config.password,
auth_switch_response,
)
.await?;
if !auth_success {
self.send_error(&mut stream, &mut state, 1045, "28000", "Access denied")
.await?;
return Ok(());
}
} else {
if auth_response != expected {
debug!(
"Password mismatch - expected: {:?}, got: {:?}",
expected, auth_response
);
self.send_error(&mut stream, &mut state, 1045, "28000", "Access denied")
.await?;
return Ok(());
}
}
self.send_ok(&mut stream, &mut state, 0, 0).await?;
info!("MySQL authentication successful, entering command loop");
loop {
let packet = match self.read_packet(&mut stream, &mut state).await {
Ok(p) => p,
Err(_) => break,
};
if packet.is_empty() {
continue;
}
let command = packet[0];
match command {
COM_QUERY => {
let query = std::str::from_utf8(&packet[1..]).map_err(|_| {
YamlBaseError::Protocol("Invalid UTF-8 in query".to_string())
})?;
self.handle_query(&mut stream, &mut state, query).await?;
}
COM_QUIT => {
info!("Client disconnected");
break;
}
COM_PING => {
self.send_ok(&mut stream, &mut state, 0, 0).await?;
}
COM_INIT_DB => {
let _db_name = std::str::from_utf8(&packet[1..]).map_err(|_| {
YamlBaseError::Protocol("Invalid UTF-8 in database name".to_string())
})?;
self.send_ok(&mut stream, &mut state, 0, 0).await?;
}
_ => {
debug!("Unhandled command: 0x{:02x}", command);
self.send_error(&mut stream, &mut state, 1047, "08S01", "Unknown command")
.await?;
}
}
}
Ok(())
}
async fn send_handshake(
&self,
stream: &mut TcpStream,
state: &mut ConnectionState,
) -> crate::Result<()> {
let mut packet = BytesMut::new();
packet.put_u8(PROTOCOL_VERSION);
packet.put_slice(SERVER_VERSION.as_bytes());
packet.put_u8(0);
packet.put_u32_le(1);
packet.put_slice(&state.auth_data[..8]);
packet.put_u8(0);
let capabilities = CLIENT_LONG_PASSWORD
| CLIENT_FOUND_ROWS
| CLIENT_LONG_FLAG
| CLIENT_CONNECT_WITH_DB
| CLIENT_PROTOCOL_41
| CLIENT_SECURE_CONNECTION
| CLIENT_PLUGIN_AUTH;
packet.put_u16_le((capabilities & 0xFFFF) as u16);
packet.put_u8(33);
packet.put_u16_le(SERVER_STATUS_AUTOCOMMIT);
packet.put_u16_le(((capabilities >> 16) & 0xFFFF) as u16);
packet.put_u8(21);
packet.put_slice(&[0; 10]);
packet.put_slice(&state.auth_data[8..20]);
packet.put_u8(0);
packet.put_slice(AUTH_PLUGIN_NAME.as_bytes());
packet.put_u8(0);
self.write_packet(stream, state, &packet).await?;
Ok(())
}
#[allow(clippy::type_complexity)]
fn parse_handshake_response(
&self,
packet: &[u8],
) -> crate::Result<(String, Vec<u8>, Option<String>, Option<String>)> {
debug!("Parsing handshake response, packet len: {}", packet.len());
let mut pos = 0;
let client_flags = u32::from_le_bytes([
packet[pos],
packet[pos + 1],
packet[pos + 2],
packet[pos + 3],
]);
debug!("Client capabilities: 0x{:08x}", client_flags);
pos += 4;
pos += 4;
pos += 1;
pos += 23;
let username_end = packet[pos..]
.iter()
.position(|&b| b == 0)
.ok_or_else(|| YamlBaseError::Protocol("Invalid handshake response".to_string()))?;
let username = std::str::from_utf8(&packet[pos..pos + username_end])
.map_err(|_| YamlBaseError::Protocol("Invalid UTF-8 in username".to_string()))?
.to_string();
debug!("Username: {}", username);
pos += username_end + 1;
let auth_len = packet[pos] as usize;
debug!(
"Auth response length byte: {}, interpreted as: {}",
packet[pos], auth_len
);
pos += 1;
let auth_response = if auth_len > 0 && pos + auth_len <= packet.len() {
packet[pos..pos + auth_len].to_vec()
} else {
debug!("Auth response empty or invalid length");
Vec::new()
};
pos += auth_len;
let database = if pos < packet.len() {
let db_end = packet[pos..]
.iter()
.position(|&b| b == 0)
.unwrap_or(packet.len() - pos);
if db_end > 0 {
Some(
std::str::from_utf8(&packet[pos..pos + db_end])
.map_err(|_| {
YamlBaseError::Protocol("Invalid UTF-8 in database".to_string())
})?
.to_string(),
)
} else {
None
}
} else {
None
};
let auth_plugin = if pos < packet.len() {
let plugin_end = packet[pos..]
.iter()
.position(|&b| b == 0)
.unwrap_or(packet.len() - pos);
if plugin_end > 0 {
Some(
std::str::from_utf8(&packet[pos..pos + plugin_end])
.map_err(|_| {
YamlBaseError::Protocol("Invalid UTF-8 in auth plugin".to_string())
})?
.to_string(),
)
} else {
None
}
} else {
None
};
debug!("Client auth plugin: {:?}", auth_plugin);
Ok((username, auth_response, database, auth_plugin))
}
async fn handle_query(
&self,
stream: &mut TcpStream,
state: &mut ConnectionState,
query: &str,
) -> crate::Result<()> {
let query_trimmed = query.trim();
let query_upper = query_trimmed.to_uppercase();
if query_trimmed.is_empty() {
debug!("Empty query received");
self.send_error(stream, state, 1064, "42000", "Syntax error: Empty query")
.await?;
return Ok(());
}
let mut processed_query = if query_trimmed.contains("@@") {
self.preprocess_system_variables(query_trimmed)
} else {
query_trimmed.to_string()
};
if processed_query.contains('`') {
processed_query = processed_query.replace('`', "");
debug!("Removed backticks: {}", processed_query);
}
if query_upper.starts_with("SET NAMES") || query_upper.starts_with("SET CHARACTER SET") {
debug!("Ignoring SET NAMES/CHARACTER SET command: {}", query);
return self.send_ok(stream, state, 0, 0).await;
}
if query_upper.starts_with("SET ") {
debug!("Ignoring SET command: {}", query);
return self.send_ok(stream, state, 0, 0).await;
}
let statements = match parse_sql(&processed_query) {
Ok(stmts) => stmts,
Err(e) => {
self.send_error(
stream,
state,
1064,
"42000",
&format!("Syntax error: {}", e),
)
.await?;
return Ok(());
}
};
for statement in statements {
debug!("Executing statement: {:?}", statement);
let is_transaction_command = matches!(
statement,
sqlparser::ast::Statement::StartTransaction { .. }
| sqlparser::ast::Statement::Commit { .. }
| sqlparser::ast::Statement::Rollback { .. }
);
match self.executor.execute(&statement).await {
Ok(result) => {
debug!(
"Query executed successfully. Result: {} columns, {} rows",
result.columns.len(),
result.rows.len()
);
if is_transaction_command
|| (result.columns.is_empty() && result.rows.is_empty())
{
debug!("Sending OK packet for transaction command or empty result");
self.send_ok(stream, state, 0, 0).await?;
} else {
self.send_query_result(stream, state, &result).await?;
}
}
Err(e) => {
debug!("Query execution error: {}", e);
self.send_error(stream, state, 1146, "42S02", &e.to_string())
.await?;
}
}
}
Ok(())
}
fn preprocess_system_variables(&self, query: &str) -> String {
use once_cell::sync::Lazy;
use regex::Regex;
let query_upper = query.to_uppercase();
if !query_upper.starts_with("SELECT") || !query.contains("@@") {
return query.to_string();
}
static VERSION_RE: Lazy<Result<Regex, regex::Error>> = Lazy::new(|| {
Regex::new(
r"@@(?:(?:global|GLOBAL|Global|session|SESSION|Session)\.)?(?:version|VERSION|Version)\b",
)
});
static VERSION_COMMENT_RE: Lazy<Result<Regex, regex::Error>> = Lazy::new(|| {
Regex::new(
r"@@(?:(?:global|GLOBAL|Global|session|SESSION|Session)\.)?(?:version_comment|VERSION_COMMENT|Version_Comment)\b",
)
});
static MAX_ALLOWED_PACKET_RE: Lazy<Result<Regex, regex::Error>> = Lazy::new(|| {
Regex::new(
r"@@(?:(?:global|GLOBAL|Global|session|SESSION|Session)\.)?(?:max_allowed_packet|MAX_ALLOWED_PACKET|Max_Allowed_Packet)\b",
)
});
static SYSTEM_VAR_RE: Lazy<Result<Regex, regex::Error>> = Lazy::new(|| {
Regex::new(
r"@@(?:(?:global|GLOBAL|Global|session|SESSION|Session)\.)?([a-zA-Z_][a-zA-Z0-9_]*)\b",
)
});
let mut result = query.to_string();
if let Ok(ref version_re) = *VERSION_RE {
result = version_re
.replace_all(&result, "'8.0.35-yamlbase'")
.to_string();
} else {
debug!("Failed to compile VERSION_RE regex");
}
if let Ok(ref version_comment_re) = *VERSION_COMMENT_RE {
result = version_comment_re.replace_all(&result, "'1'").to_string();
} else {
debug!("Failed to compile VERSION_COMMENT_RE regex");
}
if let Ok(ref max_packet_re) = *MAX_ALLOWED_PACKET_RE {
result = max_packet_re.replace_all(&result, "67108864").to_string();
} else {
debug!("Failed to compile MAX_ALLOWED_PACKET_RE regex");
}
if !result.contains("@@") {
debug!("Preprocessed query: {} -> {}", query, result);
return result;
}
if let Ok(ref system_var_re) = *SYSTEM_VAR_RE {
result = system_var_re.replace_all(&result, "'1'").to_string();
} else {
debug!("Failed to compile SYSTEM_VAR_RE regex");
}
debug!("Preprocessed query: {} -> {}", query, result);
result
}
async fn send_query_result(
&self,
stream: &mut TcpStream,
state: &mut ConnectionState,
result: &crate::sql::executor::QueryResult,
) -> crate::Result<()> {
debug!(
"Sending query result with {} columns and {} rows",
result.columns.len(),
result.rows.len()
);
let columns: Vec<&str> = result.columns.iter().map(|s| s.as_str()).collect();
debug!("Columns: {:?}", columns);
let rows: Vec<Vec<String>> = result
.rows
.iter()
.map(|row| row.iter().map(|val| val.to_string()).collect())
.collect();
debug!("Converted {} rows to strings", rows.len());
let string_rows: Vec<Vec<&str>> = rows
.iter()
.map(|row| row.iter().map(|s| s.as_str()).collect())
.collect();
debug!("Calling send_simple_result_set");
self.send_simple_result_set(stream, state, &columns, &string_rows)
.await
}
async fn send_simple_result_set(
&self,
stream: &mut TcpStream,
state: &mut ConnectionState,
columns: &[&str],
rows: &[Vec<&str>],
) -> crate::Result<()> {
debug!(
"send_simple_result_set: {} columns, {} rows",
columns.len(),
rows.len()
);
let mut packet = BytesMut::new();
packet.put_u8(columns.len() as u8);
debug!("Writing column count packet");
self.write_packet(stream, state, &packet).await?;
debug!("Writing {} column definitions", columns.len());
for (idx, column) in columns.iter().enumerate() {
debug!("Writing column definition {}: {}", idx, column);
let mut col_packet = BytesMut::new();
col_packet.put_u8(3);
col_packet.put_slice(b"def");
col_packet.put_u8(0);
col_packet.put_u8(0);
col_packet.put_u8(0);
col_packet.put_u8(column.len() as u8);
col_packet.put_slice(column.as_bytes());
col_packet.put_u8(column.len() as u8);
col_packet.put_slice(column.as_bytes());
col_packet.put_u8(0x0c);
col_packet.put_u16_le(33);
col_packet.put_u32_le(255);
col_packet.put_u8(MYSQL_TYPE_VAR_STRING);
col_packet.put_u16_le(0);
col_packet.put_u8(0);
col_packet.put_u16_le(0);
self.write_packet(stream, state, &col_packet).await?;
}
debug!("Sending EOF packet after column definitions");
let mut eof_packet = BytesMut::new();
eof_packet.put_u8(0xfe); eof_packet.put_u16_le(0); eof_packet.put_u16_le(SERVER_STATUS_AUTOCOMMIT); self.write_packet(stream, state, &eof_packet).await?;
debug!("Sending {} rows", rows.len());
const BATCH_SIZE_THRESHOLD: usize = 100; const MAX_BATCH_MEMORY: usize = 8 * 1024 * 1024;
if rows.len() > BATCH_SIZE_THRESHOLD {
debug!("Large result set detected, using batch processing");
let mut batch_start = 0;
while batch_start < rows.len() {
let mut batch_size = 0;
let mut batch_memory = 0;
for (_i, row) in rows.iter().enumerate().skip(batch_start) {
let estimated_row_size: usize = row
.iter()
.map(|v| if *v == "NULL" { 1 } else { v.len() + 5 }) .sum();
if batch_memory + estimated_row_size > MAX_BATCH_MEMORY && batch_size > 0 {
break;
}
batch_memory += estimated_row_size;
batch_size += 1;
if batch_size >= BATCH_SIZE_THRESHOLD {
break;
}
}
let batch_end = std::cmp::min(batch_start + batch_size, rows.len());
debug!(
"Processing batch: rows {}-{} ({} rows, ~{} bytes)",
batch_start,
batch_end - 1,
batch_end - batch_start,
batch_memory
);
for (idx, row) in rows[batch_start..batch_end].iter().enumerate() {
let global_idx = batch_start + idx;
debug!("Sending row {} with {} values", global_idx, row.len());
let mut row_packet = BytesMut::new();
for (col_idx, value) in row.iter().enumerate() {
if *value == "NULL" {
debug!(" Column {}: NULL", col_idx);
row_packet.put_u8(0xfb); } else {
let bytes = value.as_bytes();
debug!(" Column {}: '{}' ({} bytes)", col_idx, value, bytes.len());
if bytes.len() < 251 {
row_packet.put_u8(bytes.len() as u8);
} else if bytes.len() < 65536 {
row_packet.put_u8(0xfc);
row_packet.put_u16_le(bytes.len() as u16);
} else if bytes.len() < 16777216 {
row_packet.put_u8(0xfd);
row_packet.put_u8((bytes.len() & 0xff) as u8);
row_packet.put_u8(((bytes.len() >> 8) & 0xff) as u8);
row_packet.put_u8(((bytes.len() >> 16) & 0xff) as u8);
} else {
row_packet.put_u8(0xfe);
row_packet.put_u64_le(bytes.len() as u64);
}
row_packet.put_slice(bytes);
}
}
debug!("Row packet size: {} bytes", row_packet.len());
self.write_packet(stream, state, &row_packet).await?;
}
batch_start = batch_end;
}
} else {
for (idx, row) in rows.iter().enumerate() {
debug!("Sending row {} with {} values", idx, row.len());
let mut row_packet = BytesMut::new();
for (col_idx, value) in row.iter().enumerate() {
if *value == "NULL" {
debug!(" Column {}: NULL", col_idx);
row_packet.put_u8(0xfb); } else {
let bytes = value.as_bytes();
debug!(" Column {}: '{}' ({} bytes)", col_idx, value, bytes.len());
if bytes.len() < 251 {
row_packet.put_u8(bytes.len() as u8);
} else if bytes.len() < 65536 {
row_packet.put_u8(0xfc);
row_packet.put_u16_le(bytes.len() as u16);
} else if bytes.len() < 16777216 {
row_packet.put_u8(0xfd);
row_packet.put_u8((bytes.len() & 0xff) as u8);
row_packet.put_u8(((bytes.len() >> 8) & 0xff) as u8);
row_packet.put_u8(((bytes.len() >> 16) & 0xff) as u8);
} else {
row_packet.put_u8(0xfe);
row_packet.put_u64_le(bytes.len() as u64);
}
row_packet.put_slice(bytes);
}
}
debug!("Row packet size: {} bytes", row_packet.len());
self.write_packet(stream, state, &row_packet).await?;
}
}
debug!("Sending final EOF packet");
let mut eof_packet = BytesMut::new();
eof_packet.put_u8(0xfe); eof_packet.put_u16_le(0); eof_packet.put_u16_le(SERVER_STATUS_AUTOCOMMIT); self.write_packet(stream, state, &eof_packet).await
}
async fn send_ok(
&self,
stream: &mut TcpStream,
state: &mut ConnectionState,
affected_rows: u64,
_info: u64,
) -> crate::Result<()> {
let mut packet = BytesMut::new();
packet.put_u8(0x00);
put_lenenc_int(&mut packet, affected_rows);
put_lenenc_int(&mut packet, 0);
packet.put_u16_le(SERVER_STATUS_AUTOCOMMIT);
packet.put_u16_le(0);
self.write_packet(stream, state, &packet).await
}
async fn send_error(
&self,
stream: &mut TcpStream,
state: &mut ConnectionState,
error_code: u16,
sql_state: &str,
message: &str,
) -> crate::Result<()> {
let mut packet = BytesMut::new();
packet.put_u8(0xff);
packet.put_u16_le(error_code);
packet.put_u8(b'#');
packet.put_slice(sql_state.as_bytes());
packet.put_slice(message.as_bytes());
self.write_packet(stream, state, &packet).await
}
async fn write_packet(
&self,
stream: &mut TcpStream,
state: &mut ConnectionState,
payload: &[u8],
) -> crate::Result<()> {
const MAX_PACKET_SIZE: usize = 0xffffff;
if payload.len() <= MAX_PACKET_SIZE {
let mut packet = BytesMut::with_capacity(4 + payload.len());
packet.put_u8((payload.len() & 0xff) as u8);
packet.put_u8(((payload.len() >> 8) & 0xff) as u8);
packet.put_u8(((payload.len() >> 16) & 0xff) as u8);
packet.put_u8(state.sequence_id);
debug!(
"Writing single packet: len={}, seq={}, first_bytes={:?}",
payload.len(),
state.sequence_id,
&payload[..std::cmp::min(20, payload.len())]
);
state.sequence_id = state.sequence_id.wrapping_add(1);
packet.put_slice(payload);
stream.write_all(&packet).await?;
stream.flush().await?;
} else {
debug!(
"Splitting large payload: total_len={}, max_packet_size={}",
payload.len(),
MAX_PACKET_SIZE
);
let mut offset = 0;
while offset < payload.len() {
let chunk_size = std::cmp::min(MAX_PACKET_SIZE, payload.len() - offset);
let chunk = &payload[offset..offset + chunk_size];
let mut packet = BytesMut::with_capacity(4 + chunk_size);
packet.put_u8((chunk_size & 0xff) as u8);
packet.put_u8(((chunk_size >> 8) & 0xff) as u8);
packet.put_u8(((chunk_size >> 16) & 0xff) as u8);
packet.put_u8(state.sequence_id);
debug!(
"Writing packet chunk: len={}, seq={}, offset={}, total_remaining={}",
chunk_size,
state.sequence_id,
offset,
payload.len() - offset
);
state.sequence_id = state.sequence_id.wrapping_add(1);
packet.put_slice(chunk);
stream.write_all(&packet).await?;
stream.flush().await?;
offset += chunk_size;
}
}
Ok(())
}
async fn read_packet(
&self,
stream: &mut TcpStream,
state: &mut ConnectionState,
) -> crate::Result<Vec<u8>> {
let mut header = [0u8; 4];
stream.read_exact(&mut header).await?;
let len = (header[0] as usize) | ((header[1] as usize) << 8) | ((header[2] as usize) << 16);
state.sequence_id = header[3].wrapping_add(1);
let mut payload = vec![0u8; len];
stream.read_exact(&mut payload).await?;
Ok(payload)
}
}
fn generate_auth_data() -> Vec<u8> {
use rand::Rng;
let mut rng = rand::thread_rng();
let mut auth_data = vec![0u8; 20];
rng.fill(&mut auth_data[..]);
auth_data
}
fn compute_auth_response(password: &str, auth_data: &[u8]) -> Vec<u8> {
if password.is_empty() {
return Vec::new();
}
let mut hasher = Sha1::new();
hasher.update(password.as_bytes());
let stage1 = hasher.finalize();
let mut hasher = Sha1::new();
hasher.update(stage1);
let stage2 = hasher.finalize();
let mut hasher = Sha1::new();
hasher.update(auth_data);
hasher.update(stage2);
let result = hasher.finalize();
stage1
.iter()
.zip(result.iter())
.map(|(a, b)| a ^ b)
.collect()
}
fn put_lenenc_int(buf: &mut BytesMut, value: u64) {
if value < 251 {
buf.put_u8(value as u8);
} else if value < 65536 {
buf.put_u8(0xfc);
buf.put_u16_le(value as u16);
} else if value < 16777216 {
buf.put_u8(0xfd);
buf.put_u8((value & 0xff) as u8);
buf.put_u8(((value >> 8) & 0xff) as u8);
buf.put_u8(((value >> 16) & 0xff) as u8);
} else {
buf.put_u8(0xfe);
buf.put_u64_le(value);
}
}