1#![forbid(unsafe_code)]
2
3use super::*;
4use crate::wire::ProtocolLimits;
5
6pub(crate) fn write_tpc_txn_switch_body(
11 writer: &mut TtcWriter,
12 operation: u32,
13 flags: u32,
14 timeout: u32,
15 xid: Option<&[u8]>,
16) {
17 writer.write_ub4(operation);
18 writer.write_u8(0); writer.write_ub4(0); if let Some(global_txn_id) = xid {
21 let mut xid_bytes = global_txn_id.to_vec();
25 xid_bytes.resize(128, 0);
26 writer.write_ub4(SESSIONLESS_FORMAT_ID);
27 writer.write_ub4(u32::try_from(global_txn_id.len()).unwrap_or(0)); writer.write_ub4(0); writer.write_u8(1); writer.write_ub4(u32::try_from(xid_bytes.len()).unwrap_or(0));
31 writer.write_ub4(flags);
32 writer.write_ub4(timeout);
33 writer.write_u8(1); writer.write_u8(1); writer.write_u8(1); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); writer.write_raw(&xid_bytes);
41 writer.write_ub4(0); } else {
43 writer.write_ub4(0); writer.write_ub4(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); writer.write_ub4(flags);
49 writer.write_ub4(timeout);
50 writer.write_u8(1); writer.write_u8(1); writer.write_u8(1); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); writer.write_ub4(0); }
59}
60
61pub fn build_tpc_txn_switch_payload_with_seq(
66 seq_num: u8,
67 token_num: u64,
68 operation: u32,
69 flags: u32,
70 timeout: u32,
71 xid: Option<&[u8]>,
72) -> Vec<u8> {
73 let mut writer = TtcWriter::new();
74 writer.write_function_code_with_seq(TNS_FUNC_TPC_TXN_SWITCH, seq_num);
75 writer.write_ub8(token_num);
76 write_tpc_txn_switch_body(&mut writer, operation, flags, timeout, xid);
77 writer.into_bytes()
78}
79
80pub fn build_sessionless_piggyback(
87 seq_num: u8,
88 token_num: u64,
89 operation: u32,
90 flags: u32,
91 timeout: u32,
92 xid: Option<&[u8]>,
93) -> Vec<u8> {
94 let mut writer = TtcWriter::new();
95 writer.write_u8(TNS_MSG_TYPE_PIGGYBACK);
96 writer.write_u8(TNS_FUNC_TPC_TXN_SWITCH);
97 writer.write_u8(seq_num);
98 writer.write_ub8(token_num);
99 write_tpc_txn_switch_body(&mut writer, operation, flags, timeout, xid);
100 writer.into_bytes()
101}
102
103pub fn decode_sessionless_txn_state(binary: &[u8]) -> Result<Option<SessionlessTxnState>> {
108 if binary.len() < 2 {
109 return Err(ProtocolError::TtcDecode("short sessionless txn state"));
110 }
111 let state = binary[binary.len() - 2];
112 let sync_version = binary[binary.len() - 1];
113 if sync_version != 1 {
114 return Err(ProtocolError::TtcDecode("unknown transaction sync version"));
115 }
116 if state & TNS_TPC_TXNID_SYNC_UNSET != 0 {
117 Ok(Some(SessionlessTxnState::Unset))
118 } else if state & TNS_TPC_TXNID_SYNC_SET != 0 {
119 Ok(Some(SessionlessTxnState::Set {
120 started_on_server: state & TNS_TPC_TXNID_SYNC_SERVER != 0,
121 }))
122 } else {
123 Ok(None)
124 }
125}
126
127pub fn parse_tpc_txn_switch_response(
132 payload: &[u8],
133 capabilities: ClientCapabilities,
134) -> Result<Option<SessionlessTxnState>> {
135 parse_tpc_txn_switch_response_with_limits(payload, capabilities, ProtocolLimits::DEFAULT)
136}
137
138pub fn parse_tpc_txn_switch_response_with_limits(
139 payload: &[u8],
140 capabilities: ClientCapabilities,
141 limits: ProtocolLimits,
142) -> Result<Option<SessionlessTxnState>> {
143 let mut reader = TtcReader::with_limits(payload, limits)?;
144 let mut state = None;
145 while reader.remaining() > 0 {
146 let message_type = reader.read_u8()?;
147 match message_type {
148 0 => {}
149 TNS_MSG_TYPE_STATUS => {
150 let _call_status = reader.read_ub4()?;
151 let _seq = reader.read_ub2()?;
152 }
153 TNS_MSG_TYPE_PARAMETER => {
154 let _application_value = reader.read_ub4()?;
157 let context_len = reader.read_ub2()?;
158 if context_len > 0 {
159 reader.skip(usize::from(context_len))?;
160 }
161 }
162 TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
163 if let Some(update) = skip_server_side_piggyback(&mut reader)? {
164 state = Some(update);
165 }
166 }
167 TNS_MSG_TYPE_END_OF_RESPONSE => break,
168 TNS_MSG_TYPE_ERROR => {
169 let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
170 if info.number != 0 {
171 return Err(ProtocolError::ServerErrorInfo(Box::new(
172 info.into_details(),
173 )));
174 }
175 }
176 _ => break,
177 }
178 }
179 Ok(state)
180}
181
182pub fn build_begin_pipeline_piggyback(seq_num: u8, token_num: u64, pipeline_mode: u8) -> Vec<u8> {
190 let mut writer = TtcWriter::new();
191 writer.write_u8(TNS_MSG_TYPE_PIGGYBACK);
192 writer.write_u8(TNS_FUNC_PIPELINE_BEGIN);
193 writer.write_u8(seq_num);
194 writer.write_ub8(token_num);
195 writer.write_ub2(0); writer.write_u8(0); writer.write_u8(pipeline_mode);
198 writer.into_bytes()
199}
200
201pub fn build_end_pipeline_payload_with_seq(seq_num: u8) -> Vec<u8> {
206 let mut writer = TtcWriter::new();
207 writer.write_function_code_with_seq(TNS_FUNC_PIPELINE_END, seq_num);
208 writer.write_ub8(0); writer.write_ub4(0); writer.into_bytes()
211}
212
213#[derive(Clone, Debug)]
217pub struct TpcXid<'a> {
218 pub format_id: u32,
219 pub global_transaction_id: &'a [u8],
220 pub branch_qualifier: &'a [u8],
221}
222
223fn write_xid_descriptor(writer: &mut TtcWriter, xid: Option<&TpcXid<'_>>) {
230 match xid {
231 Some(xid) => {
232 writer.write_ub4(xid.format_id);
233 writer.write_ub4(u32::try_from(xid.global_transaction_id.len()).unwrap_or(0));
234 writer.write_ub4(u32::try_from(xid.branch_qualifier.len()).unwrap_or(0));
235 writer.write_u8(1); writer.write_ub4(128); }
238 None => {
239 writer.write_ub4(0); writer.write_ub4(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); }
245 }
246}
247
248fn write_xid_block_bytes(writer: &mut TtcWriter, xid: &TpcXid<'_>) {
251 let mut xid_bytes = Vec::with_capacity(128);
252 xid_bytes.extend_from_slice(xid.global_transaction_id);
253 xid_bytes.extend_from_slice(xid.branch_qualifier);
254 xid_bytes.resize(128, 0);
255 writer.write_raw(&xid_bytes);
256}
257
258pub fn build_tpc_switch_payload_with_seq(
265 seq_num: u8,
266 operation: u32,
267 flags: u32,
268 timeout: u32,
269 xid: Option<&TpcXid<'_>>,
270 context: Option<&[u8]>,
271) -> Vec<u8> {
272 let mut writer = TtcWriter::new();
273 writer.write_function_code_with_seq(TNS_FUNC_TPC_TXN_SWITCH, seq_num);
274 writer.write_ub8(0); writer.write_ub4(operation);
276 match context {
277 Some(context) => {
278 writer.write_u8(1); writer.write_ub4(u32::try_from(context.len()).unwrap_or(0));
280 }
281 None => {
282 writer.write_u8(0); writer.write_ub4(0); }
285 }
286 write_xid_descriptor(&mut writer, xid);
287 writer.write_ub4(flags);
288 writer.write_ub4(timeout);
289 writer.write_u8(1); writer.write_u8(1); writer.write_u8(1); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); if let Some(context) = context {
297 writer.write_raw(context);
298 }
299 if let Some(xid) = xid {
300 write_xid_block_bytes(&mut writer, xid);
301 }
302 writer.write_ub4(0); writer.into_bytes()
304}
305
306pub fn build_tpc_change_state_payload_with_seq(
312 seq_num: u8,
313 operation: u32,
314 requested_state: u32,
315 flags: u32,
316 xid: Option<&TpcXid<'_>>,
317 context: Option<&[u8]>,
318) -> Vec<u8> {
319 let mut writer = TtcWriter::new();
320 writer.write_function_code_with_seq(TNS_FUNC_TPC_TXN_CHANGE_STATE, seq_num);
321 writer.write_ub8(0); writer.write_ub4(operation);
323 match context {
324 Some(context) => {
325 writer.write_u8(1); writer.write_ub4(u32::try_from(context.len()).unwrap_or(0));
327 }
328 None => {
329 writer.write_u8(0); writer.write_ub4(0); }
332 }
333 write_xid_descriptor(&mut writer, xid);
334 writer.write_ub4(0); writer.write_ub4(requested_state);
336 writer.write_u8(1); writer.write_ub4(flags);
338 if let Some(context) = context {
339 writer.write_raw(context);
340 }
341 if let Some(xid) = xid {
342 write_xid_block_bytes(&mut writer, xid);
343 }
344 writer.into_bytes()
345}
346
347pub fn parse_tpc_switch_response(
352 payload: &[u8],
353 capabilities: ClientCapabilities,
354) -> Result<TpcSwitchResponse> {
355 parse_tpc_switch_response_with_limits(payload, capabilities, ProtocolLimits::DEFAULT)
356}
357
358pub fn parse_tpc_switch_response_with_limits(
359 payload: &[u8],
360 capabilities: ClientCapabilities,
361 limits: ProtocolLimits,
362) -> Result<TpcSwitchResponse> {
363 let mut reader = TtcReader::with_limits(payload, limits)?;
364 let mut response = TpcSwitchResponse::default();
365 while reader.remaining() > 0 {
366 let message_type = reader.read_u8()?;
367 match message_type {
368 0 => {}
369 TNS_MSG_TYPE_STATUS => {
370 let call_status = reader.read_ub4()?;
371 let _seq = reader.read_ub2()?;
372 response.txn_in_progress = call_status & TNS_EOCS_FLAGS_TXN_IN_PROGRESS != 0;
373 }
374 TNS_MSG_TYPE_PARAMETER => {
375 let _application_value = reader.read_ub4()?;
378 let context_len = reader.read_ub2()?;
379 let context = reader.read_raw(usize::from(context_len))?;
380 response.context = context.to_vec();
381 }
382 TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
383 if let Some(update) = skip_server_side_piggyback(&mut reader)? {
384 response.sessionless_state = Some(update);
385 }
386 }
387 TNS_MSG_TYPE_END_OF_RESPONSE => break,
388 TNS_MSG_TYPE_ERROR => {
389 let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
390 if info.number != 0 {
391 return Err(ProtocolError::ServerErrorInfo(Box::new(
396 info.into_details(),
397 )));
398 }
399 response.txn_in_progress = info.call_status & TNS_EOCS_FLAGS_TXN_IN_PROGRESS != 0;
402 }
403 _ => break,
404 }
405 }
406 Ok(response)
407}
408
409pub fn parse_tpc_change_state_response(
414 payload: &[u8],
415 capabilities: ClientCapabilities,
416) -> Result<TpcChangeStateResponse> {
417 parse_tpc_change_state_response_with_limits(payload, capabilities, ProtocolLimits::DEFAULT)
418}
419
420pub fn parse_tpc_change_state_response_with_limits(
421 payload: &[u8],
422 capabilities: ClientCapabilities,
423 limits: ProtocolLimits,
424) -> Result<TpcChangeStateResponse> {
425 let mut reader = TtcReader::with_limits(payload, limits)?;
426 let mut response = TpcChangeStateResponse::default();
427 while reader.remaining() > 0 {
428 let message_type = reader.read_u8()?;
429 match message_type {
430 0 => {}
431 TNS_MSG_TYPE_STATUS => {
432 let call_status = reader.read_ub4()?;
433 let _seq = reader.read_ub2()?;
434 response.txn_in_progress = call_status & TNS_EOCS_FLAGS_TXN_IN_PROGRESS != 0;
435 }
436 TNS_MSG_TYPE_PARAMETER => {
437 response.state = reader.read_ub4()?;
440 }
441 TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
442 skip_server_side_piggyback(&mut reader)?;
443 }
444 TNS_MSG_TYPE_END_OF_RESPONSE => break,
445 TNS_MSG_TYPE_ERROR => {
446 let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
447 if info.number != 0 {
448 return Err(ProtocolError::ServerErrorInfo(Box::new(
453 info.into_details(),
454 )));
455 }
456 response.txn_in_progress = info.call_status & TNS_EOCS_FLAGS_TXN_IN_PROGRESS != 0;
459 }
460 _ => break,
461 }
462 }
463 Ok(response)
464}
465
466pub(crate) fn skip_keyword_value_pairs(reader: &mut TtcReader<'_>, num_pairs: u16) -> Result<()> {
467 read_keyword_value_pairs_for_txn_state(reader, num_pairs).map(|_| ())
468}
469
470pub(crate) fn read_keyword_value_pairs_for_txn_state(
475 reader: &mut TtcReader<'_>,
476 num_pairs: u16,
477) -> Result<Option<SessionlessTxnState>> {
478 let mut state = None;
479 for _ in 0..num_pairs {
480 if reader.read_ub2()? > 0 {
481 let _text_value = reader.read_bytes()?;
482 }
483 let mut binary_value = None;
484 if reader.read_ub2()? > 0 {
485 binary_value = reader.read_bytes()?;
486 }
487 let keyword_num = reader.read_ub2()?;
488 if keyword_num == TNS_KEYWORD_NUM_TRANSACTION_ID {
489 if let Some(binary) = binary_value.as_deref() {
490 if let Some(update) = decode_sessionless_txn_state(binary)? {
491 state = Some(update);
492 }
493 }
494 }
495 }
496 Ok(state)
497}
498
499#[cfg(test)]
500mod tpc_tests {
501 use super::*;
502
503 fn xid() -> ([u8; 7], [u8; 8]) {
504 (*b"txn4400", *b"branchId")
505 }
506
507 #[test]
508 fn tpc_begin_payload_encodes_format_branch_and_128_byte_xid() {
509 let (gtid, bqual) = xid();
510 let tpc_xid = TpcXid {
511 format_id: 4400,
512 global_transaction_id: >id,
513 branch_qualifier: &bqual,
514 };
515 let payload = build_tpc_switch_payload_with_seq(
516 4,
517 TNS_TPC_TXN_START,
518 TPC_TXN_FLAGS_NEW,
519 0,
520 Some(&tpc_xid),
521 None,
522 );
523 assert_eq!(&payload[..3], &[3, TNS_FUNC_TPC_TXN_SWITCH, 4]);
525 let body = &payload[4..];
526 assert_eq!(&body[..4], &[1, 1, 0, 0]);
528 assert_eq!(&body[4..7], &[2, 0x11, 0x30]);
530 assert_eq!(&body[7..14], &[1, 7, 1, 8, 1, 1, 0x80]);
533 let block_start = payload.len() - 128 - 1;
536 let block = &payload[block_start..block_start + 128];
537 assert_eq!(&block[..7], b"txn4400");
538 assert_eq!(&block[7..15], b"branchId");
539 assert!(block[15..].iter().all(|&byte| byte == 0));
540 }
541
542 #[test]
543 fn tpc_end_payload_echoes_context() {
544 let context = vec![0xAAu8; 168];
545 let payload =
546 build_tpc_switch_payload_with_seq(7, TNS_TPC_TXN_DETACH, 0, 0, None, Some(&context));
547 let body = &payload[4..];
548 assert_eq!(&body[..5], &[1, 2, 1, 1, 0xA8]);
550 assert!(payload
552 .windows(context.len())
553 .any(|window| window == context.as_slice()));
554 }
555
556 #[test]
557 fn change_state_prepare_payload_shape() {
558 let (gtid, bqual) = xid();
559 let tpc_xid = TpcXid {
560 format_id: 4400,
561 global_transaction_id: >id,
562 branch_qualifier: &bqual,
563 };
564 let payload = build_tpc_change_state_payload_with_seq(
565 8,
566 TNS_TPC_TXN_PREPARE,
567 TNS_TPC_TXN_STATE_PREPARE,
568 0,
569 Some(&tpc_xid),
570 None,
571 );
572 assert_eq!(&payload[..3], &[3, TNS_FUNC_TPC_TXN_CHANGE_STATE, 8]);
573 let body = &payload[4..];
574 assert_eq!(&body[..4], &[1, 3, 0, 0]);
576 }
577
578 #[test]
579 fn switch_response_captures_context_and_txn_bit() {
580 let mut payload = Vec::new();
583 payload.push(TNS_MSG_TYPE_PARAMETER);
584 payload.push(0); payload.extend_from_slice(&[2, 0, 4]); payload.extend_from_slice(&[0xDE, 0xAD, 0xBE, 0xEF]);
587 payload.push(TNS_MSG_TYPE_STATUS);
588 payload.extend_from_slice(&[1, 3]); payload.extend_from_slice(&[0]); payload.push(TNS_MSG_TYPE_END_OF_RESPONSE);
591
592 let response =
593 parse_tpc_switch_response(&payload, ClientCapabilities::default()).expect("decode");
594 assert_eq!(response.context, vec![0xDE, 0xAD, 0xBE, 0xEF]);
595 assert!(response.txn_in_progress);
596 }
597
598 #[test]
599 fn switch_response_end_status_clears_txn_bit() {
600 let mut payload = Vec::new();
602 payload.push(TNS_MSG_TYPE_STATUS);
603 payload.extend_from_slice(&[1, 1]); payload.extend_from_slice(&[0]); payload.push(TNS_MSG_TYPE_END_OF_RESPONSE);
606
607 let response =
608 parse_tpc_switch_response(&payload, ClientCapabilities::default()).expect("decode");
609 assert!(!response.txn_in_progress);
610 }
611
612 #[test]
613 fn change_state_response_reads_out_state() {
614 let mut payload = Vec::new();
616 payload.push(TNS_MSG_TYPE_PARAMETER);
617 payload.extend_from_slice(&[1, 1]); payload.push(TNS_MSG_TYPE_STATUS);
619 payload.extend_from_slice(&[1, 1]); payload.extend_from_slice(&[0]); payload.push(TNS_MSG_TYPE_END_OF_RESPONSE);
622
623 let response = parse_tpc_change_state_response(&payload, ClientCapabilities::default())
624 .expect("decode");
625 assert_eq!(response.state, TNS_TPC_TXN_STATE_REQUIRES_COMMIT);
626 assert!(!response.txn_in_progress);
627 }
628}