1#![forbid(unsafe_code)]
2
3use super::*;
4
5pub(crate) fn write_tpc_txn_switch_body(
10 writer: &mut TtcWriter,
11 operation: u32,
12 flags: u32,
13 timeout: u32,
14 xid: Option<&[u8]>,
15) {
16 writer.write_ub4(operation);
17 writer.write_u8(0); writer.write_ub4(0); if let Some(global_txn_id) = xid {
20 let mut xid_bytes = global_txn_id.to_vec();
24 xid_bytes.resize(128, 0);
25 writer.write_ub4(SESSIONLESS_FORMAT_ID);
26 writer.write_ub4(u32::try_from(global_txn_id.len()).unwrap_or(0)); writer.write_ub4(0); writer.write_u8(1); writer.write_ub4(u32::try_from(xid_bytes.len()).unwrap_or(0));
30 writer.write_ub4(flags);
31 writer.write_ub4(timeout);
32 writer.write_u8(1); writer.write_u8(1); writer.write_u8(1); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); writer.write_raw(&xid_bytes);
40 writer.write_ub4(0); } else {
42 writer.write_ub4(0); writer.write_ub4(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); writer.write_ub4(flags);
48 writer.write_ub4(timeout);
49 writer.write_u8(1); writer.write_u8(1); writer.write_u8(1); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); writer.write_ub4(0); }
58}
59
60pub fn build_tpc_txn_switch_payload_with_seq(
65 seq_num: u8,
66 token_num: u64,
67 operation: u32,
68 flags: u32,
69 timeout: u32,
70 xid: Option<&[u8]>,
71) -> Vec<u8> {
72 let mut writer = TtcWriter::new();
73 writer.write_function_code_with_seq(TNS_FUNC_TPC_TXN_SWITCH, seq_num);
74 writer.write_ub8(token_num);
75 write_tpc_txn_switch_body(&mut writer, operation, flags, timeout, xid);
76 writer.into_bytes()
77}
78
79pub fn build_sessionless_piggyback(
86 seq_num: u8,
87 token_num: u64,
88 operation: u32,
89 flags: u32,
90 timeout: u32,
91 xid: Option<&[u8]>,
92) -> Vec<u8> {
93 let mut writer = TtcWriter::new();
94 writer.write_u8(TNS_MSG_TYPE_PIGGYBACK);
95 writer.write_u8(TNS_FUNC_TPC_TXN_SWITCH);
96 writer.write_u8(seq_num);
97 writer.write_ub8(token_num);
98 write_tpc_txn_switch_body(&mut writer, operation, flags, timeout, xid);
99 writer.into_bytes()
100}
101
102pub fn decode_sessionless_txn_state(binary: &[u8]) -> Result<Option<SessionlessTxnState>> {
107 if binary.len() < 2 {
108 return Err(ProtocolError::TtcDecode("short sessionless txn state"));
109 }
110 let state = binary[binary.len() - 2];
111 let sync_version = binary[binary.len() - 1];
112 if sync_version != 1 {
113 return Err(ProtocolError::TtcDecode("unknown transaction sync version"));
114 }
115 if state & TNS_TPC_TXNID_SYNC_UNSET != 0 {
116 Ok(Some(SessionlessTxnState::Unset))
117 } else if state & TNS_TPC_TXNID_SYNC_SET != 0 {
118 Ok(Some(SessionlessTxnState::Set {
119 started_on_server: state & TNS_TPC_TXNID_SYNC_SERVER != 0,
120 }))
121 } else {
122 Ok(None)
123 }
124}
125
126pub fn parse_tpc_txn_switch_response(
131 payload: &[u8],
132 capabilities: ClientCapabilities,
133) -> Result<Option<SessionlessTxnState>> {
134 let mut reader = TtcReader::new(payload);
135 let mut state = None;
136 while reader.remaining() > 0 {
137 let message_type = reader.read_u8()?;
138 match message_type {
139 0 => {}
140 TNS_MSG_TYPE_STATUS => {
141 let _call_status = reader.read_ub4()?;
142 let _seq = reader.read_ub2()?;
143 }
144 TNS_MSG_TYPE_PARAMETER => {
145 let _application_value = reader.read_ub4()?;
148 let context_len = reader.read_ub2()?;
149 if context_len > 0 {
150 reader.skip(usize::from(context_len))?;
151 }
152 }
153 TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
154 if let Some(update) = skip_server_side_piggyback(&mut reader)? {
155 state = Some(update);
156 }
157 }
158 TNS_MSG_TYPE_END_OF_RESPONSE => break,
159 TNS_MSG_TYPE_ERROR => {
160 let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
161 if info.number != 0 {
162 return Err(ProtocolError::ServerErrorInfo(Box::new(
163 info.into_details(),
164 )));
165 }
166 }
167 _ => break,
168 }
169 }
170 Ok(state)
171}
172
173pub fn build_begin_pipeline_piggyback(seq_num: u8, token_num: u64, pipeline_mode: u8) -> Vec<u8> {
181 let mut writer = TtcWriter::new();
182 writer.write_u8(TNS_MSG_TYPE_PIGGYBACK);
183 writer.write_u8(TNS_FUNC_PIPELINE_BEGIN);
184 writer.write_u8(seq_num);
185 writer.write_ub8(token_num);
186 writer.write_ub2(0); writer.write_u8(0); writer.write_u8(pipeline_mode);
189 writer.into_bytes()
190}
191
192pub fn build_end_pipeline_payload_with_seq(seq_num: u8) -> Vec<u8> {
197 let mut writer = TtcWriter::new();
198 writer.write_function_code_with_seq(TNS_FUNC_PIPELINE_END, seq_num);
199 writer.write_ub8(0); writer.write_ub4(0); writer.into_bytes()
202}
203
204#[derive(Clone, Debug)]
208pub struct TpcXid<'a> {
209 pub format_id: u32,
210 pub global_transaction_id: &'a [u8],
211 pub branch_qualifier: &'a [u8],
212}
213
214fn write_xid_descriptor(writer: &mut TtcWriter, xid: Option<&TpcXid<'_>>) {
221 match xid {
222 Some(xid) => {
223 writer.write_ub4(xid.format_id);
224 writer.write_ub4(u32::try_from(xid.global_transaction_id.len()).unwrap_or(0));
225 writer.write_ub4(u32::try_from(xid.branch_qualifier.len()).unwrap_or(0));
226 writer.write_u8(1); writer.write_ub4(128); }
229 None => {
230 writer.write_ub4(0); writer.write_ub4(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); }
236 }
237}
238
239fn write_xid_block_bytes(writer: &mut TtcWriter, xid: &TpcXid<'_>) {
242 let mut xid_bytes = Vec::with_capacity(128);
243 xid_bytes.extend_from_slice(xid.global_transaction_id);
244 xid_bytes.extend_from_slice(xid.branch_qualifier);
245 xid_bytes.resize(128, 0);
246 writer.write_raw(&xid_bytes);
247}
248
249pub fn build_tpc_switch_payload_with_seq(
256 seq_num: u8,
257 operation: u32,
258 flags: u32,
259 timeout: u32,
260 xid: Option<&TpcXid<'_>>,
261 context: Option<&[u8]>,
262) -> Vec<u8> {
263 let mut writer = TtcWriter::new();
264 writer.write_function_code_with_seq(TNS_FUNC_TPC_TXN_SWITCH, seq_num);
265 writer.write_ub8(0); writer.write_ub4(operation);
267 match context {
268 Some(context) => {
269 writer.write_u8(1); writer.write_ub4(u32::try_from(context.len()).unwrap_or(0));
271 }
272 None => {
273 writer.write_u8(0); writer.write_ub4(0); }
276 }
277 write_xid_descriptor(&mut writer, xid);
278 writer.write_ub4(flags);
279 writer.write_ub4(timeout);
280 writer.write_u8(1); writer.write_u8(1); writer.write_u8(1); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); if let Some(context) = context {
288 writer.write_raw(context);
289 }
290 if let Some(xid) = xid {
291 write_xid_block_bytes(&mut writer, xid);
292 }
293 writer.write_ub4(0); writer.into_bytes()
295}
296
297pub fn build_tpc_change_state_payload_with_seq(
303 seq_num: u8,
304 operation: u32,
305 requested_state: u32,
306 flags: u32,
307 xid: Option<&TpcXid<'_>>,
308 context: Option<&[u8]>,
309) -> Vec<u8> {
310 let mut writer = TtcWriter::new();
311 writer.write_function_code_with_seq(TNS_FUNC_TPC_TXN_CHANGE_STATE, seq_num);
312 writer.write_ub8(0); writer.write_ub4(operation);
314 match context {
315 Some(context) => {
316 writer.write_u8(1); writer.write_ub4(u32::try_from(context.len()).unwrap_or(0));
318 }
319 None => {
320 writer.write_u8(0); writer.write_ub4(0); }
323 }
324 write_xid_descriptor(&mut writer, xid);
325 writer.write_ub4(0); writer.write_ub4(requested_state);
327 writer.write_u8(1); writer.write_ub4(flags);
329 if let Some(context) = context {
330 writer.write_raw(context);
331 }
332 if let Some(xid) = xid {
333 write_xid_block_bytes(&mut writer, xid);
334 }
335 writer.into_bytes()
336}
337
338pub fn parse_tpc_switch_response(
343 payload: &[u8],
344 capabilities: ClientCapabilities,
345) -> Result<TpcSwitchResponse> {
346 let mut reader = TtcReader::new(payload);
347 let mut response = TpcSwitchResponse::default();
348 while reader.remaining() > 0 {
349 let message_type = reader.read_u8()?;
350 match message_type {
351 0 => {}
352 TNS_MSG_TYPE_STATUS => {
353 let call_status = reader.read_ub4()?;
354 let _seq = reader.read_ub2()?;
355 response.txn_in_progress = call_status & TNS_EOCS_FLAGS_TXN_IN_PROGRESS != 0;
356 }
357 TNS_MSG_TYPE_PARAMETER => {
358 let _application_value = reader.read_ub4()?;
361 let context_len = reader.read_ub2()?;
362 let context = reader.read_raw(usize::from(context_len))?;
363 response.context = context.to_vec();
364 }
365 TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
366 if let Some(update) = skip_server_side_piggyback(&mut reader)? {
367 response.sessionless_state = Some(update);
368 }
369 }
370 TNS_MSG_TYPE_END_OF_RESPONSE => break,
371 TNS_MSG_TYPE_ERROR => {
372 let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
373 if info.number != 0 {
374 return Err(ProtocolError::ServerErrorInfo(Box::new(
379 info.into_details(),
380 )));
381 }
382 response.txn_in_progress = info.call_status & TNS_EOCS_FLAGS_TXN_IN_PROGRESS != 0;
385 }
386 _ => break,
387 }
388 }
389 Ok(response)
390}
391
392pub fn parse_tpc_change_state_response(
397 payload: &[u8],
398 capabilities: ClientCapabilities,
399) -> Result<TpcChangeStateResponse> {
400 let mut reader = TtcReader::new(payload);
401 let mut response = TpcChangeStateResponse::default();
402 while reader.remaining() > 0 {
403 let message_type = reader.read_u8()?;
404 match message_type {
405 0 => {}
406 TNS_MSG_TYPE_STATUS => {
407 let call_status = reader.read_ub4()?;
408 let _seq = reader.read_ub2()?;
409 response.txn_in_progress = call_status & TNS_EOCS_FLAGS_TXN_IN_PROGRESS != 0;
410 }
411 TNS_MSG_TYPE_PARAMETER => {
412 response.state = reader.read_ub4()?;
415 }
416 TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
417 skip_server_side_piggyback(&mut reader)?;
418 }
419 TNS_MSG_TYPE_END_OF_RESPONSE => break,
420 TNS_MSG_TYPE_ERROR => {
421 let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
422 if info.number != 0 {
423 return Err(ProtocolError::ServerErrorInfo(Box::new(
428 info.into_details(),
429 )));
430 }
431 response.txn_in_progress = info.call_status & TNS_EOCS_FLAGS_TXN_IN_PROGRESS != 0;
434 }
435 _ => break,
436 }
437 }
438 Ok(response)
439}
440
441pub(crate) fn skip_keyword_value_pairs(reader: &mut TtcReader<'_>, num_pairs: u16) -> Result<()> {
442 read_keyword_value_pairs_for_txn_state(reader, num_pairs).map(|_| ())
443}
444
445pub(crate) fn read_keyword_value_pairs_for_txn_state(
450 reader: &mut TtcReader<'_>,
451 num_pairs: u16,
452) -> Result<Option<SessionlessTxnState>> {
453 let mut state = None;
454 for _ in 0..num_pairs {
455 if reader.read_ub2()? > 0 {
456 let _text_value = reader.read_bytes()?;
457 }
458 let mut binary_value = None;
459 if reader.read_ub2()? > 0 {
460 binary_value = reader.read_bytes()?;
461 }
462 let keyword_num = reader.read_ub2()?;
463 if keyword_num == TNS_KEYWORD_NUM_TRANSACTION_ID {
464 if let Some(binary) = binary_value.as_deref() {
465 if let Some(update) = decode_sessionless_txn_state(binary)? {
466 state = Some(update);
467 }
468 }
469 }
470 }
471 Ok(state)
472}
473
474#[cfg(test)]
475mod tpc_tests {
476 use super::*;
477
478 fn xid() -> ([u8; 7], [u8; 8]) {
479 (*b"txn4400", *b"branchId")
480 }
481
482 #[test]
483 fn tpc_begin_payload_encodes_format_branch_and_128_byte_xid() {
484 let (gtid, bqual) = xid();
485 let tpc_xid = TpcXid {
486 format_id: 4400,
487 global_transaction_id: >id,
488 branch_qualifier: &bqual,
489 };
490 let payload = build_tpc_switch_payload_with_seq(
491 4,
492 TNS_TPC_TXN_START,
493 TPC_TXN_FLAGS_NEW,
494 0,
495 Some(&tpc_xid),
496 None,
497 );
498 assert_eq!(&payload[..3], &[3, TNS_FUNC_TPC_TXN_SWITCH, 4]);
500 let body = &payload[4..];
501 assert_eq!(&body[..4], &[1, 1, 0, 0]);
503 assert_eq!(&body[4..7], &[2, 0x11, 0x30]);
505 assert_eq!(&body[7..14], &[1, 7, 1, 8, 1, 1, 0x80]);
508 let block_start = payload.len() - 128 - 1;
511 let block = &payload[block_start..block_start + 128];
512 assert_eq!(&block[..7], b"txn4400");
513 assert_eq!(&block[7..15], b"branchId");
514 assert!(block[15..].iter().all(|&byte| byte == 0));
515 }
516
517 #[test]
518 fn tpc_end_payload_echoes_context() {
519 let context = vec![0xAAu8; 168];
520 let payload =
521 build_tpc_switch_payload_with_seq(7, TNS_TPC_TXN_DETACH, 0, 0, None, Some(&context));
522 let body = &payload[4..];
523 assert_eq!(&body[..5], &[1, 2, 1, 1, 0xA8]);
525 assert!(payload
527 .windows(context.len())
528 .any(|window| window == context.as_slice()));
529 }
530
531 #[test]
532 fn change_state_prepare_payload_shape() {
533 let (gtid, bqual) = xid();
534 let tpc_xid = TpcXid {
535 format_id: 4400,
536 global_transaction_id: >id,
537 branch_qualifier: &bqual,
538 };
539 let payload = build_tpc_change_state_payload_with_seq(
540 8,
541 TNS_TPC_TXN_PREPARE,
542 TNS_TPC_TXN_STATE_PREPARE,
543 0,
544 Some(&tpc_xid),
545 None,
546 );
547 assert_eq!(&payload[..3], &[3, TNS_FUNC_TPC_TXN_CHANGE_STATE, 8]);
548 let body = &payload[4..];
549 assert_eq!(&body[..4], &[1, 3, 0, 0]);
551 }
552
553 #[test]
554 fn switch_response_captures_context_and_txn_bit() {
555 let mut payload = Vec::new();
558 payload.push(TNS_MSG_TYPE_PARAMETER);
559 payload.push(0); payload.extend_from_slice(&[2, 0, 4]); payload.extend_from_slice(&[0xDE, 0xAD, 0xBE, 0xEF]);
562 payload.push(TNS_MSG_TYPE_STATUS);
563 payload.extend_from_slice(&[1, 3]); payload.extend_from_slice(&[0]); payload.push(TNS_MSG_TYPE_END_OF_RESPONSE);
566
567 let response =
568 parse_tpc_switch_response(&payload, ClientCapabilities::default()).expect("decode");
569 assert_eq!(response.context, vec![0xDE, 0xAD, 0xBE, 0xEF]);
570 assert!(response.txn_in_progress);
571 }
572
573 #[test]
574 fn switch_response_end_status_clears_txn_bit() {
575 let mut payload = Vec::new();
577 payload.push(TNS_MSG_TYPE_STATUS);
578 payload.extend_from_slice(&[1, 1]); payload.extend_from_slice(&[0]); payload.push(TNS_MSG_TYPE_END_OF_RESPONSE);
581
582 let response =
583 parse_tpc_switch_response(&payload, ClientCapabilities::default()).expect("decode");
584 assert!(!response.txn_in_progress);
585 }
586
587 #[test]
588 fn change_state_response_reads_out_state() {
589 let mut payload = Vec::new();
591 payload.push(TNS_MSG_TYPE_PARAMETER);
592 payload.extend_from_slice(&[1, 1]); payload.push(TNS_MSG_TYPE_STATUS);
594 payload.extend_from_slice(&[1, 1]); payload.extend_from_slice(&[0]); payload.push(TNS_MSG_TYPE_END_OF_RESPONSE);
597
598 let response = parse_tpc_change_state_response(&payload, ClientCapabilities::default())
599 .expect("decode");
600 assert_eq!(response.state, TNS_TPC_TXN_STATE_REQUIRES_COMMIT);
601 assert!(!response.txn_in_progress);
602 }
603}