#![forbid(unsafe_code)]
use super::*;
use crate::wire::ProtocolLimits;
pub(crate) fn write_tpc_txn_switch_body(
writer: &mut TtcWriter,
operation: u32,
flags: u32,
timeout: u32,
xid: Option<&[u8]>,
) {
writer.write_ub4(operation);
writer.write_u8(0); writer.write_ub4(0); if let Some(global_txn_id) = xid {
let mut xid_bytes = global_txn_id.to_vec();
xid_bytes.resize(128, 0);
writer.write_ub4(SESSIONLESS_FORMAT_ID);
writer.write_ub4(u32::try_from(global_txn_id.len()).unwrap_or(0)); writer.write_ub4(0); writer.write_u8(1); writer.write_ub4(u32::try_from(xid_bytes.len()).unwrap_or(0));
writer.write_ub4(flags);
writer.write_ub4(timeout);
writer.write_u8(1); writer.write_u8(1); writer.write_u8(1); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); writer.write_raw(&xid_bytes);
writer.write_ub4(0); } else {
writer.write_ub4(0); writer.write_ub4(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); writer.write_ub4(flags);
writer.write_ub4(timeout);
writer.write_u8(1); writer.write_u8(1); writer.write_u8(1); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); writer.write_ub4(0); }
}
pub fn build_tpc_txn_switch_payload_with_seq(
seq_num: u8,
token_num: u64,
operation: u32,
flags: u32,
timeout: u32,
xid: Option<&[u8]>,
) -> Vec<u8> {
let mut writer = TtcWriter::new();
writer.write_function_code_with_seq(TNS_FUNC_TPC_TXN_SWITCH, seq_num);
writer.write_ub8(token_num);
write_tpc_txn_switch_body(&mut writer, operation, flags, timeout, xid);
writer.into_bytes()
}
pub fn build_sessionless_piggyback(
seq_num: u8,
token_num: u64,
operation: u32,
flags: u32,
timeout: u32,
xid: Option<&[u8]>,
) -> Vec<u8> {
let mut writer = TtcWriter::new();
writer.write_u8(TNS_MSG_TYPE_PIGGYBACK);
writer.write_u8(TNS_FUNC_TPC_TXN_SWITCH);
writer.write_u8(seq_num);
writer.write_ub8(token_num);
write_tpc_txn_switch_body(&mut writer, operation, flags, timeout, xid);
writer.into_bytes()
}
pub fn decode_sessionless_txn_state(binary: &[u8]) -> Result<Option<SessionlessTxnState>> {
if binary.len() < 2 {
return Err(ProtocolError::TtcDecode("short sessionless txn state"));
}
let state = binary[binary.len() - 2];
let sync_version = binary[binary.len() - 1];
if sync_version != 1 {
return Err(ProtocolError::TtcDecode("unknown transaction sync version"));
}
if state & TNS_TPC_TXNID_SYNC_UNSET != 0 {
Ok(Some(SessionlessTxnState::Unset))
} else if state & TNS_TPC_TXNID_SYNC_SET != 0 {
Ok(Some(SessionlessTxnState::Set {
started_on_server: state & TNS_TPC_TXNID_SYNC_SERVER != 0,
}))
} else {
Ok(None)
}
}
pub fn parse_tpc_txn_switch_response(
payload: &[u8],
capabilities: ClientCapabilities,
) -> Result<Option<SessionlessTxnState>> {
parse_tpc_txn_switch_response_with_limits(payload, capabilities, ProtocolLimits::DEFAULT)
}
pub fn parse_tpc_txn_switch_response_with_limits(
payload: &[u8],
capabilities: ClientCapabilities,
limits: ProtocolLimits,
) -> Result<Option<SessionlessTxnState>> {
let mut reader = TtcReader::with_limits(payload, limits)?;
let mut state = None;
while reader.remaining() > 0 {
let message_type = reader.read_u8()?;
match message_type {
0 => {}
TNS_MSG_TYPE_STATUS => {
let _call_status = reader.read_ub4()?;
let _seq = reader.read_ub2()?;
}
TNS_MSG_TYPE_PARAMETER => {
let _application_value = reader.read_ub4()?;
let context_len = reader.read_ub2()?;
if context_len > 0 {
reader.skip(usize::from(context_len))?;
}
}
TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
if let Some(update) = skip_server_side_piggyback(&mut reader)? {
state = Some(update);
}
}
TNS_MSG_TYPE_END_OF_RESPONSE => break,
TNS_MSG_TYPE_ERROR => {
let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
if info.number != 0 {
return Err(ProtocolError::ServerErrorInfo(Box::new(
info.into_details(),
)));
}
}
_ => break,
}
}
Ok(state)
}
pub fn build_begin_pipeline_piggyback(seq_num: u8, token_num: u64, pipeline_mode: u8) -> Vec<u8> {
let mut writer = TtcWriter::new();
writer.write_u8(TNS_MSG_TYPE_PIGGYBACK);
writer.write_u8(TNS_FUNC_PIPELINE_BEGIN);
writer.write_u8(seq_num);
writer.write_ub8(token_num);
writer.write_ub2(0); writer.write_u8(0); writer.write_u8(pipeline_mode);
writer.into_bytes()
}
pub fn build_end_pipeline_payload_with_seq(seq_num: u8) -> Vec<u8> {
let mut writer = TtcWriter::new();
writer.write_function_code_with_seq(TNS_FUNC_PIPELINE_END, seq_num);
writer.write_ub8(0); writer.write_ub4(0); writer.into_bytes()
}
#[derive(Clone, Debug)]
pub struct TpcXid<'a> {
pub format_id: u32,
pub global_transaction_id: &'a [u8],
pub branch_qualifier: &'a [u8],
}
fn write_xid_descriptor(writer: &mut TtcWriter, xid: Option<&TpcXid<'_>>) {
match xid {
Some(xid) => {
writer.write_ub4(xid.format_id);
writer.write_ub4(u32::try_from(xid.global_transaction_id.len()).unwrap_or(0));
writer.write_ub4(u32::try_from(xid.branch_qualifier.len()).unwrap_or(0));
writer.write_u8(1); writer.write_ub4(128); }
None => {
writer.write_ub4(0); writer.write_ub4(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); }
}
}
fn write_xid_block_bytes(writer: &mut TtcWriter, xid: &TpcXid<'_>) {
let mut xid_bytes = Vec::with_capacity(128);
xid_bytes.extend_from_slice(xid.global_transaction_id);
xid_bytes.extend_from_slice(xid.branch_qualifier);
xid_bytes.resize(128, 0);
writer.write_raw(&xid_bytes);
}
pub fn build_tpc_switch_payload_with_seq(
seq_num: u8,
operation: u32,
flags: u32,
timeout: u32,
xid: Option<&TpcXid<'_>>,
context: Option<&[u8]>,
) -> Vec<u8> {
let mut writer = TtcWriter::new();
writer.write_function_code_with_seq(TNS_FUNC_TPC_TXN_SWITCH, seq_num);
writer.write_ub8(0); writer.write_ub4(operation);
match context {
Some(context) => {
writer.write_u8(1); writer.write_ub4(u32::try_from(context.len()).unwrap_or(0));
}
None => {
writer.write_u8(0); writer.write_ub4(0); }
}
write_xid_descriptor(&mut writer, xid);
writer.write_ub4(flags);
writer.write_ub4(timeout);
writer.write_u8(1); writer.write_u8(1); writer.write_u8(1); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); if let Some(context) = context {
writer.write_raw(context);
}
if let Some(xid) = xid {
write_xid_block_bytes(&mut writer, xid);
}
writer.write_ub4(0); writer.into_bytes()
}
pub fn build_tpc_change_state_payload_with_seq(
seq_num: u8,
operation: u32,
requested_state: u32,
flags: u32,
xid: Option<&TpcXid<'_>>,
context: Option<&[u8]>,
) -> Vec<u8> {
let mut writer = TtcWriter::new();
writer.write_function_code_with_seq(TNS_FUNC_TPC_TXN_CHANGE_STATE, seq_num);
writer.write_ub8(0); writer.write_ub4(operation);
match context {
Some(context) => {
writer.write_u8(1); writer.write_ub4(u32::try_from(context.len()).unwrap_or(0));
}
None => {
writer.write_u8(0); writer.write_ub4(0); }
}
write_xid_descriptor(&mut writer, xid);
writer.write_ub4(0); writer.write_ub4(requested_state);
writer.write_u8(1); writer.write_ub4(flags);
if let Some(context) = context {
writer.write_raw(context);
}
if let Some(xid) = xid {
write_xid_block_bytes(&mut writer, xid);
}
writer.into_bytes()
}
pub fn parse_tpc_switch_response(
payload: &[u8],
capabilities: ClientCapabilities,
) -> Result<TpcSwitchResponse> {
parse_tpc_switch_response_with_limits(payload, capabilities, ProtocolLimits::DEFAULT)
}
pub fn parse_tpc_switch_response_with_limits(
payload: &[u8],
capabilities: ClientCapabilities,
limits: ProtocolLimits,
) -> Result<TpcSwitchResponse> {
let mut reader = TtcReader::with_limits(payload, limits)?;
let mut response = TpcSwitchResponse::default();
while reader.remaining() > 0 {
let message_type = reader.read_u8()?;
match message_type {
0 => {}
TNS_MSG_TYPE_STATUS => {
let call_status = reader.read_ub4()?;
let _seq = reader.read_ub2()?;
response.txn_in_progress = call_status & TNS_EOCS_FLAGS_TXN_IN_PROGRESS != 0;
}
TNS_MSG_TYPE_PARAMETER => {
let _application_value = reader.read_ub4()?;
let context_len = reader.read_ub2()?;
let context = reader.read_raw(usize::from(context_len))?;
response.context = context.to_vec();
}
TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
if let Some(update) = skip_server_side_piggyback(&mut reader)? {
response.sessionless_state = Some(update);
}
}
TNS_MSG_TYPE_END_OF_RESPONSE => break,
TNS_MSG_TYPE_ERROR => {
let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
if info.number != 0 {
return Err(ProtocolError::ServerErrorInfo(Box::new(
info.into_details(),
)));
}
response.txn_in_progress = info.call_status & TNS_EOCS_FLAGS_TXN_IN_PROGRESS != 0;
}
_ => break,
}
}
Ok(response)
}
pub fn parse_tpc_change_state_response(
payload: &[u8],
capabilities: ClientCapabilities,
) -> Result<TpcChangeStateResponse> {
parse_tpc_change_state_response_with_limits(payload, capabilities, ProtocolLimits::DEFAULT)
}
pub fn parse_tpc_change_state_response_with_limits(
payload: &[u8],
capabilities: ClientCapabilities,
limits: ProtocolLimits,
) -> Result<TpcChangeStateResponse> {
let mut reader = TtcReader::with_limits(payload, limits)?;
let mut response = TpcChangeStateResponse::default();
while reader.remaining() > 0 {
let message_type = reader.read_u8()?;
match message_type {
0 => {}
TNS_MSG_TYPE_STATUS => {
let call_status = reader.read_ub4()?;
let _seq = reader.read_ub2()?;
response.txn_in_progress = call_status & TNS_EOCS_FLAGS_TXN_IN_PROGRESS != 0;
}
TNS_MSG_TYPE_PARAMETER => {
response.state = reader.read_ub4()?;
}
TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
skip_server_side_piggyback(&mut reader)?;
}
TNS_MSG_TYPE_END_OF_RESPONSE => break,
TNS_MSG_TYPE_ERROR => {
let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
if info.number != 0 {
return Err(ProtocolError::ServerErrorInfo(Box::new(
info.into_details(),
)));
}
response.txn_in_progress = info.call_status & TNS_EOCS_FLAGS_TXN_IN_PROGRESS != 0;
}
_ => break,
}
}
Ok(response)
}
pub(crate) fn skip_keyword_value_pairs(reader: &mut TtcReader<'_>, num_pairs: u16) -> Result<()> {
read_keyword_value_pairs_for_txn_state(reader, num_pairs).map(|_| ())
}
pub(crate) fn read_keyword_value_pairs_for_txn_state(
reader: &mut TtcReader<'_>,
num_pairs: u16,
) -> Result<Option<SessionlessTxnState>> {
let mut state = None;
for _ in 0..num_pairs {
if reader.read_ub2()? > 0 {
let _text_value = reader.read_bytes()?;
}
let mut binary_value = None;
if reader.read_ub2()? > 0 {
binary_value = reader.read_bytes()?;
}
let keyword_num = reader.read_ub2()?;
if keyword_num == TNS_KEYWORD_NUM_TRANSACTION_ID {
if let Some(binary) = binary_value.as_deref() {
if let Some(update) = decode_sessionless_txn_state(binary)? {
state = Some(update);
}
}
}
}
Ok(state)
}
#[cfg(test)]
mod tpc_tests {
use super::*;
fn xid() -> ([u8; 7], [u8; 8]) {
(*b"txn4400", *b"branchId")
}
#[test]
fn tpc_begin_payload_encodes_format_branch_and_128_byte_xid() {
let (gtid, bqual) = xid();
let tpc_xid = TpcXid {
format_id: 4400,
global_transaction_id: >id,
branch_qualifier: &bqual,
};
let payload = build_tpc_switch_payload_with_seq(
4,
TNS_TPC_TXN_START,
TPC_TXN_FLAGS_NEW,
0,
Some(&tpc_xid),
None,
);
assert_eq!(&payload[..3], &[3, TNS_FUNC_TPC_TXN_SWITCH, 4]);
let body = &payload[4..];
assert_eq!(&body[..4], &[1, 1, 0, 0]);
assert_eq!(&body[4..7], &[2, 0x11, 0x30]);
assert_eq!(&body[7..14], &[1, 7, 1, 8, 1, 1, 0x80]);
let block_start = payload.len() - 128 - 1;
let block = &payload[block_start..block_start + 128];
assert_eq!(&block[..7], b"txn4400");
assert_eq!(&block[7..15], b"branchId");
assert!(block[15..].iter().all(|&byte| byte == 0));
}
#[test]
fn tpc_end_payload_echoes_context() {
let context = vec![0xAAu8; 168];
let payload =
build_tpc_switch_payload_with_seq(7, TNS_TPC_TXN_DETACH, 0, 0, None, Some(&context));
let body = &payload[4..];
assert_eq!(&body[..5], &[1, 2, 1, 1, 0xA8]);
assert!(payload
.windows(context.len())
.any(|window| window == context.as_slice()));
}
#[test]
fn change_state_prepare_payload_shape() {
let (gtid, bqual) = xid();
let tpc_xid = TpcXid {
format_id: 4400,
global_transaction_id: >id,
branch_qualifier: &bqual,
};
let payload = build_tpc_change_state_payload_with_seq(
8,
TNS_TPC_TXN_PREPARE,
TNS_TPC_TXN_STATE_PREPARE,
0,
Some(&tpc_xid),
None,
);
assert_eq!(&payload[..3], &[3, TNS_FUNC_TPC_TXN_CHANGE_STATE, 8]);
let body = &payload[4..];
assert_eq!(&body[..4], &[1, 3, 0, 0]);
}
#[test]
fn switch_response_captures_context_and_txn_bit() {
let mut payload = Vec::new();
payload.push(TNS_MSG_TYPE_PARAMETER);
payload.push(0); payload.extend_from_slice(&[2, 0, 4]); payload.extend_from_slice(&[0xDE, 0xAD, 0xBE, 0xEF]);
payload.push(TNS_MSG_TYPE_STATUS);
payload.extend_from_slice(&[1, 3]); payload.extend_from_slice(&[0]); payload.push(TNS_MSG_TYPE_END_OF_RESPONSE);
let response =
parse_tpc_switch_response(&payload, ClientCapabilities::default()).expect("decode");
assert_eq!(response.context, vec![0xDE, 0xAD, 0xBE, 0xEF]);
assert!(response.txn_in_progress);
}
#[test]
fn switch_response_end_status_clears_txn_bit() {
let mut payload = Vec::new();
payload.push(TNS_MSG_TYPE_STATUS);
payload.extend_from_slice(&[1, 1]); payload.extend_from_slice(&[0]); payload.push(TNS_MSG_TYPE_END_OF_RESPONSE);
let response =
parse_tpc_switch_response(&payload, ClientCapabilities::default()).expect("decode");
assert!(!response.txn_in_progress);
}
#[test]
fn change_state_response_reads_out_state() {
let mut payload = Vec::new();
payload.push(TNS_MSG_TYPE_PARAMETER);
payload.extend_from_slice(&[1, 1]); payload.push(TNS_MSG_TYPE_STATUS);
payload.extend_from_slice(&[1, 1]); payload.extend_from_slice(&[0]); payload.push(TNS_MSG_TYPE_END_OF_RESPONSE);
let response = parse_tpc_change_state_response(&payload, ClientCapabilities::default())
.expect("decode");
assert_eq!(response.state, TNS_TPC_TXN_STATE_REQUIRES_COMMIT);
assert!(!response.txn_in_progress);
}
}