1use serde::{Deserialize, Serialize};
27use std::collections::HashSet;
28use std::sync::atomic::{AtomicU64, Ordering};
29
30pub type TxnId = u64;
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
35pub enum TxnState {
36 Active,
37 Committed,
38 Aborted,
39}
40
41#[repr(u8)]
57#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
58pub enum WalRecordType {
59 Data = 0x01,
61 PageUpdate = 0x02,
63 Delete = 0x03,
65 TxnBegin = 0x10,
67 TxnCommit = 0x11,
69 TxnAbort = 0x12,
71 Savepoint = 0x13,
73 RollbackToSavepoint = 0x14,
75 TxnEnd = 0x15,
77 Checkpoint = 0x20,
79 CheckpointEnd = 0x21,
81 SchemaChange = 0x30,
83 CompensationLogRecord = 0x40,
85 Compaction = 0x50,
87 Flush = 0x51,
89 BatchBegin = 0x60,
91 BatchCommit = 0x61,
93}
94
95impl TryFrom<u8> for WalRecordType {
96 type Error = ();
97
98 fn try_from(value: u8) -> Result<Self, Self::Error> {
99 match value {
100 0x01 => Ok(WalRecordType::Data),
101 0x02 => Ok(WalRecordType::PageUpdate),
102 0x03 => Ok(WalRecordType::Delete),
103 0x10 => Ok(WalRecordType::TxnBegin),
104 0x11 => Ok(WalRecordType::TxnCommit),
105 0x12 => Ok(WalRecordType::TxnAbort),
106 0x13 => Ok(WalRecordType::Savepoint),
107 0x14 => Ok(WalRecordType::RollbackToSavepoint),
108 0x15 => Ok(WalRecordType::TxnEnd),
109 0x20 => Ok(WalRecordType::Checkpoint),
110 0x21 => Ok(WalRecordType::CheckpointEnd),
111 0x30 => Ok(WalRecordType::SchemaChange),
112 0x40 => Ok(WalRecordType::CompensationLogRecord),
113 0x50 => Ok(WalRecordType::Compaction),
114 0x51 => Ok(WalRecordType::Flush),
115 0x60 => Ok(WalRecordType::BatchBegin),
116 0x61 => Ok(WalRecordType::BatchCommit),
117 _ => Err(()),
118 }
119 }
120}
121
122pub type Lsn = u64;
129
130pub type PageId = u64;
132
133#[derive(Debug, Clone, Serialize, Deserialize)]
135pub struct AriesTransactionEntry {
136 pub txn_id: TxnId,
138 pub state: TxnState,
140 pub last_lsn: Lsn,
142 pub undo_next_lsn: Option<Lsn>,
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize)]
148pub struct AriesDirtyPageEntry {
149 pub page_id: PageId,
151 pub rec_lsn: Lsn,
153}
154
155#[derive(Debug, Clone, Default, Serialize, Deserialize)]
157pub struct AriesCheckpointData {
158 pub active_transactions: Vec<AriesTransactionEntry>,
160 pub dirty_pages: Vec<AriesDirtyPageEntry>,
162 pub begin_checkpoint_lsn: Lsn,
164}
165
166#[derive(Debug, Clone, Serialize, Deserialize)]
168pub struct TxnWrite {
169 pub key: Vec<u8>,
171 pub value: Option<Vec<u8>>,
173 pub table: String,
175}
176
177#[derive(Debug, Clone, Hash, PartialEq, Eq)]
179pub struct TxnRead {
180 pub key: Vec<u8>,
181 pub table: String,
182}
183
184#[derive(Debug, Clone, Serialize, Deserialize)]
192pub struct TxnWalEntry {
193 pub record_type: WalRecordType,
195 pub txn_id: TxnId,
197 pub timestamp_us: u64,
199 pub key: Option<Vec<u8>>,
201 pub value: Option<Vec<u8>>,
203 pub table: Option<String>,
205 pub checksum: u32,
207 #[serde(default)]
209 pub lsn: Lsn,
210 #[serde(default)]
212 pub prev_lsn: Option<Lsn>,
213 #[serde(default)]
215 pub page_id: Option<PageId>,
216 #[serde(default)]
218 pub undo_info: Option<Vec<u8>>,
219 #[serde(default)]
221 pub undo_next_lsn: Option<Lsn>,
222}
223
224impl TxnWalEntry {
225 pub fn new_begin(txn_id: TxnId, timestamp_us: u64) -> Self {
226 Self {
227 record_type: WalRecordType::TxnBegin,
228 txn_id,
229 timestamp_us,
230 key: None,
231 value: None,
232 table: None,
233 checksum: 0,
234 lsn: 0,
235 prev_lsn: None,
236 page_id: None,
237 undo_info: None,
238 undo_next_lsn: None,
239 }
240 }
241
242 pub fn new_commit(txn_id: TxnId, timestamp_us: u64) -> Self {
243 Self {
244 record_type: WalRecordType::TxnCommit,
245 txn_id,
246 timestamp_us,
247 key: None,
248 value: None,
249 table: None,
250 checksum: 0,
251 lsn: 0,
252 prev_lsn: None,
253 page_id: None,
254 undo_info: None,
255 undo_next_lsn: None,
256 }
257 }
258
259 pub fn new_abort(txn_id: TxnId, timestamp_us: u64) -> Self {
260 Self {
261 record_type: WalRecordType::TxnAbort,
262 txn_id,
263 timestamp_us,
264 key: None,
265 value: None,
266 table: None,
267 checksum: 0,
268 lsn: 0,
269 prev_lsn: None,
270 page_id: None,
271 undo_info: None,
272 undo_next_lsn: None,
273 }
274 }
275
276 pub fn new_data(
277 txn_id: TxnId,
278 timestamp_us: u64,
279 table: String,
280 key: Vec<u8>,
281 value: Option<Vec<u8>>,
282 ) -> Self {
283 Self {
284 record_type: WalRecordType::Data,
285 txn_id,
286 timestamp_us,
287 key: Some(key),
288 value,
289 table: Some(table),
290 checksum: 0,
291 lsn: 0,
292 prev_lsn: None,
293 page_id: None,
294 undo_info: None,
295 undo_next_lsn: None,
296 }
297 }
298
299 #[allow(clippy::too_many_arguments)]
301 pub fn new_aries_data(
302 txn_id: TxnId,
303 timestamp_us: u64,
304 table: String,
305 key: Vec<u8>,
306 value: Option<Vec<u8>>,
307 page_id: PageId,
308 prev_lsn: Option<Lsn>,
309 undo_info: Option<Vec<u8>>,
310 ) -> Self {
311 Self {
312 record_type: WalRecordType::Data,
313 txn_id,
314 timestamp_us,
315 key: Some(key),
316 value,
317 table: Some(table),
318 checksum: 0,
319 lsn: 0, prev_lsn,
321 page_id: Some(page_id),
322 undo_info,
323 undo_next_lsn: None,
324 }
325 }
326
327 #[allow(clippy::too_many_arguments)]
333 pub fn new_clr(
334 txn_id: TxnId,
335 timestamp_us: u64,
336 table: String,
337 key: Vec<u8>,
338 value: Option<Vec<u8>>,
339 page_id: PageId,
340 prev_lsn: Lsn,
341 undo_next_lsn: Lsn,
342 ) -> Self {
343 Self {
344 record_type: WalRecordType::CompensationLogRecord,
345 txn_id,
346 timestamp_us,
347 key: Some(key),
348 value,
349 table: Some(table),
350 checksum: 0,
351 lsn: 0,
352 prev_lsn: Some(prev_lsn),
353 page_id: Some(page_id),
354 undo_info: None, undo_next_lsn: Some(undo_next_lsn),
356 }
357 }
358
359 pub fn new_checkpoint_end(
361 timestamp_us: u64,
362 checkpoint_data: AriesCheckpointData,
363 ) -> Result<Self, String> {
364 let data = bincode::serialize(&checkpoint_data)
365 .map_err(|e| format!("Failed to serialize checkpoint data: {}", e))?;
366 Ok(Self {
367 record_type: WalRecordType::CheckpointEnd,
368 txn_id: 0,
369 timestamp_us,
370 key: None,
371 value: Some(data),
372 table: None,
373 checksum: 0,
374 lsn: 0,
375 prev_lsn: None,
376 page_id: None,
377 undo_info: None,
378 undo_next_lsn: None,
379 })
380 }
381
382 pub fn get_checkpoint_data(&self) -> Option<AriesCheckpointData> {
384 if self.record_type != WalRecordType::CheckpointEnd {
385 return None;
386 }
387 self.value
388 .as_ref()
389 .and_then(|data| bincode::deserialize(data).ok())
390 }
391
392 pub fn compute_checksum(&mut self) {
394 let data = self.serialize_for_checksum();
395 self.checksum = crc32fast::hash(&data);
396 }
397
398 pub fn verify_checksum(&self) -> bool {
400 let data = self.serialize_for_checksum();
401 crc32fast::hash(&data) == self.checksum
402 }
403
404 fn serialize_for_checksum(&self) -> Vec<u8> {
405 let mut buf = Vec::new();
410 buf.push(self.record_type as u8);
411 buf.extend(&self.txn_id.to_le_bytes());
412 buf.extend(&self.timestamp_us.to_le_bytes());
413 if let Some(ref key) = self.key {
414 buf.extend(&(key.len() as u32).to_le_bytes());
415 buf.extend(key);
416 } else {
417 buf.extend(&0u32.to_le_bytes());
418 }
419 if let Some(ref value) = self.value {
420 buf.extend(&(value.len() as u32).to_le_bytes());
421 buf.extend(value);
422 } else {
423 buf.extend(&0u32.to_le_bytes());
424 }
425 if let Some(ref table) = self.table {
426 buf.extend(&(table.len() as u32).to_le_bytes());
427 buf.extend(table.as_bytes());
428 } else {
429 buf.extend(&0u32.to_le_bytes());
430 }
431 buf.extend(&self.lsn.to_le_bytes());
433 match self.prev_lsn {
435 Some(lsn) => {
436 buf.push(1u8);
437 buf.extend(&lsn.to_le_bytes());
438 }
439 None => buf.push(0u8),
440 }
441 match self.page_id {
443 Some(pid) => {
444 buf.push(1u8);
445 buf.extend(&pid.to_le_bytes());
446 }
447 None => buf.push(0u8),
448 }
449 if let Some(ref undo) = self.undo_info {
451 buf.extend(&(undo.len() as u32).to_le_bytes());
452 buf.extend(undo);
453 } else {
454 buf.extend(&0u32.to_le_bytes());
455 }
456 match self.undo_next_lsn {
458 Some(lsn) => {
459 buf.push(1u8);
460 buf.extend(&lsn.to_le_bytes());
461 }
462 None => buf.push(0u8),
463 }
464 buf
465 }
466
467 pub const FORMAT_VERSION: u8 = 0x01;
474
475 pub fn to_bytes(&self) -> Vec<u8> {
482 let payload = self.serialize_for_checksum();
483 let mut buf = Vec::with_capacity(1 + payload.len() + 4);
484 buf.push(Self::FORMAT_VERSION);
485 buf.extend(&payload);
486 buf.extend(&self.checksum.to_le_bytes());
487 buf
488 }
489
490 pub fn from_bytes(data: &[u8]) -> Result<Self, String> {
504 if data.is_empty() {
505 return Err("WAL entry empty".to_string());
506 }
507
508 let (version, payload_start) = if data.len() >= 2
515 && data[0] == Self::FORMAT_VERSION
516 && WalRecordType::try_from(data[1]).is_ok()
517 {
518 (1u8, 1usize) } else {
520 (0u8, 0usize) };
522
523 let payload = &data[payload_start..];
524
525 if payload.len() < 21 {
527 return Err(format!(
528 "WAL entry too short: {} bytes, need at least 21",
529 payload.len()
530 ));
531 }
532
533 let record_type = WalRecordType::try_from(payload[0])
534 .map_err(|_| format!("Invalid WAL record type: {}", payload[0]))?;
535
536 let txn_id = u64::from_le_bytes(
537 payload[1..9]
538 .try_into()
539 .map_err(|_| "Failed to parse txn_id: slice too short")?,
540 );
541 let timestamp_us = u64::from_le_bytes(
542 payload[9..17]
543 .try_into()
544 .map_err(|_| "Failed to parse timestamp: slice too short")?,
545 );
546
547 let mut offset = 17;
548
549 if offset + 4 > payload.len() {
551 return Err(format!(
552 "WAL entry truncated at key_len: offset {} + 4 > {}",
553 offset,
554 payload.len()
555 ));
556 }
557 let key_len = u32::from_le_bytes(
558 payload[offset..offset + 4]
559 .try_into()
560 .map_err(|_| "Failed to parse key_len")?,
561 ) as usize;
562 offset += 4;
563
564 if offset + key_len > payload.len() {
565 return Err(format!(
566 "WAL entry truncated at key: need {} bytes at offset {}, have {}",
567 key_len,
568 offset,
569 payload.len()
570 ));
571 }
572 let key = if key_len > 0 {
573 Some(payload[offset..offset + key_len].to_vec())
574 } else {
575 None
576 };
577 offset += key_len;
578
579 if offset + 4 > payload.len() {
581 return Err(format!(
582 "WAL entry truncated at value_len: offset {} + 4 > {}",
583 offset,
584 payload.len()
585 ));
586 }
587 let value_len = u32::from_le_bytes(
588 payload[offset..offset + 4]
589 .try_into()
590 .map_err(|_| "Failed to parse value_len")?,
591 ) as usize;
592 offset += 4;
593
594 if offset + value_len > payload.len() {
595 return Err(format!(
596 "WAL entry truncated at value: need {} bytes at offset {}, have {}",
597 value_len,
598 offset,
599 payload.len()
600 ));
601 }
602 let value = if value_len > 0 {
603 Some(payload[offset..offset + value_len].to_vec())
604 } else {
605 None
606 };
607 offset += value_len;
608
609 if offset + 4 > payload.len() {
611 return Err(format!(
612 "WAL entry truncated at table_len: offset {} + 4 > {}",
613 offset,
614 payload.len()
615 ));
616 }
617 let table_len = u32::from_le_bytes(
618 payload[offset..offset + 4]
619 .try_into()
620 .map_err(|_| "Failed to parse table_len")?,
621 ) as usize;
622 offset += 4;
623
624 if offset + table_len > payload.len() {
625 return Err(format!(
626 "WAL entry truncated at table: need {} bytes at offset {}, have {}",
627 table_len,
628 offset,
629 payload.len()
630 ));
631 }
632 let table = if table_len > 0 {
633 Some(
634 String::from_utf8(payload[offset..offset + table_len].to_vec())
635 .map_err(|e| format!("Invalid UTF-8 in table name: {}", e))?,
636 )
637 } else {
638 None
639 };
640 offset += table_len;
641
642 let (lsn, prev_lsn, page_id, undo_info, undo_next_lsn) = if version >= 1 {
644 if offset + 8 > payload.len() {
646 return Err(format!(
647 "WAL V1 entry truncated at lsn: offset {} + 8 > {}",
648 offset,
649 payload.len()
650 ));
651 }
652 let lsn = u64::from_le_bytes(
653 payload[offset..offset + 8]
654 .try_into()
655 .map_err(|_| "Failed to parse lsn")?,
656 );
657 offset += 8;
658
659 if offset >= payload.len() {
661 return Err("WAL V1 entry truncated at prev_lsn tag".to_string());
662 }
663 let prev_lsn = if payload[offset] == 1 {
664 offset += 1;
665 if offset + 8 > payload.len() {
666 return Err("WAL V1 entry truncated at prev_lsn value".to_string());
667 }
668 let v = u64::from_le_bytes(
669 payload[offset..offset + 8]
670 .try_into()
671 .map_err(|_| "Failed to parse prev_lsn")?,
672 );
673 offset += 8;
674 Some(v)
675 } else {
676 offset += 1;
677 None
678 };
679
680 if offset >= payload.len() {
682 return Err("WAL V1 entry truncated at page_id tag".to_string());
683 }
684 let page_id = if payload[offset] == 1 {
685 offset += 1;
686 if offset + 8 > payload.len() {
687 return Err("WAL V1 entry truncated at page_id value".to_string());
688 }
689 let v = u64::from_le_bytes(
690 payload[offset..offset + 8]
691 .try_into()
692 .map_err(|_| "Failed to parse page_id")?,
693 );
694 offset += 8;
695 Some(v)
696 } else {
697 offset += 1;
698 None
699 };
700
701 if offset + 4 > payload.len() {
703 return Err("WAL V1 entry truncated at undo_info_len".to_string());
704 }
705 let undo_len = u32::from_le_bytes(
706 payload[offset..offset + 4]
707 .try_into()
708 .map_err(|_| "Failed to parse undo_info_len")?,
709 ) as usize;
710 offset += 4;
711 let undo_info = if undo_len > 0 {
712 if offset + undo_len > payload.len() {
713 return Err("WAL V1 entry truncated at undo_info data".to_string());
714 }
715 let v = payload[offset..offset + undo_len].to_vec();
716 offset += undo_len;
717 Some(v)
718 } else {
719 None
720 };
721
722 if offset >= payload.len() {
724 return Err("WAL V1 entry truncated at undo_next_lsn tag".to_string());
725 }
726 let undo_next_lsn = if payload[offset] == 1 {
727 offset += 1;
728 if offset + 8 > payload.len() {
729 return Err("WAL V1 entry truncated at undo_next_lsn value".to_string());
730 }
731 let v = u64::from_le_bytes(
732 payload[offset..offset + 8]
733 .try_into()
734 .map_err(|_| "Failed to parse undo_next_lsn")?,
735 );
736 offset += 8;
737 Some(v)
738 } else {
739 offset += 1;
740 None
741 };
742
743 (lsn, prev_lsn, page_id, undo_info, undo_next_lsn)
744 } else {
745 (0u64, None, None, None, None)
747 };
748
749 if offset + 4 > payload.len() {
751 return Err(format!(
752 "WAL entry truncated at checksum: offset {} + 4 > {}",
753 offset,
754 payload.len()
755 ));
756 }
757 let checksum = u32::from_le_bytes(
758 payload[offset..offset + 4]
759 .try_into()
760 .map_err(|_| "Failed to parse checksum")?,
761 );
762
763 let entry = Self {
764 record_type,
765 txn_id,
766 timestamp_us,
767 key,
768 value,
769 table,
770 checksum,
771 lsn,
772 prev_lsn,
773 page_id,
774 undo_info,
775 undo_next_lsn,
776 };
777
778 if !entry.verify_checksum() {
780 return Err(format!(
781 "WAL entry checksum mismatch for txn_id {}: expected valid checksum, got {}",
782 entry.txn_id, entry.checksum
783 ));
784 }
785
786 Ok(entry)
787 }
788}
789
790#[derive(Debug)]
792pub struct Transaction {
793 pub id: TxnId,
795 pub state: TxnState,
797 pub start_ts: u64,
799 pub commit_ts: Option<u64>,
801 pub writes: Vec<TxnWrite>,
803 pub read_set: HashSet<TxnRead>,
805 pub isolation: IsolationLevel,
807}
808
809#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
811pub enum IsolationLevel {
812 ReadCommitted,
814 #[default]
816 SnapshotIsolation,
817 Serializable,
819}
820
821impl Transaction {
822 pub fn new(id: TxnId, start_ts: u64, isolation: IsolationLevel) -> Self {
824 Self {
825 id,
826 state: TxnState::Active,
827 start_ts,
828 commit_ts: None,
829 writes: Vec::new(),
830 read_set: HashSet::new(),
831 isolation,
832 }
833 }
834
835 pub fn put(&mut self, table: &str, key: Vec<u8>, value: Vec<u8>) {
837 self.writes.push(TxnWrite {
838 key,
839 value: Some(value),
840 table: table.to_string(),
841 });
842 }
843
844 pub fn delete(&mut self, table: &str, key: Vec<u8>) {
846 self.writes.push(TxnWrite {
847 key,
848 value: None,
849 table: table.to_string(),
850 });
851 }
852
853 pub fn record_read(&mut self, table: &str, key: Vec<u8>) {
855 self.read_set.insert(TxnRead {
856 key,
857 table: table.to_string(),
858 });
859 }
860
861 pub fn get_local(&self, table: &str, key: &[u8]) -> Option<&TxnWrite> {
863 self.writes
864 .iter()
865 .rev()
866 .find(|w| w.table == table && w.key == key)
867 }
868
869 pub fn is_read_only(&self) -> bool {
871 self.writes.is_empty()
872 }
873}
874
875#[derive(Debug, Clone, Default)]
877pub struct TxnStats {
878 pub active_count: u64,
879 pub committed_count: u64,
880 pub aborted_count: u64,
881 pub conflict_aborts: u64,
882}
883
884pub struct TransactionManager {
896 next_txn_id: AtomicU64,
898 timestamp_counter: AtomicU64,
900 committed_watermark: AtomicU64,
902 stats: parking_lot::RwLock<TxnStats>,
904}
905
906impl TransactionManager {
907 pub fn new() -> Self {
908 Self {
909 next_txn_id: AtomicU64::new(1),
910 timestamp_counter: AtomicU64::new(1),
911 committed_watermark: AtomicU64::new(0),
912 stats: parking_lot::RwLock::new(TxnStats::default()),
913 }
914 }
915
916 pub fn begin(&self) -> Transaction {
918 self.begin_with_isolation(IsolationLevel::default())
919 }
920
921 pub fn begin_with_isolation(&self, isolation: IsolationLevel) -> Transaction {
923 let txn_id = self.next_txn_id.fetch_add(1, Ordering::SeqCst);
924 let start_ts = self.timestamp_counter.fetch_add(1, Ordering::SeqCst);
925
926 {
927 let mut stats = self.stats.write();
928 stats.active_count += 1;
929 }
930
931 Transaction::new(txn_id, start_ts, isolation)
932 }
933
934 pub fn get_commit_ts(&self) -> u64 {
936 self.timestamp_counter.fetch_add(1, Ordering::SeqCst)
937 }
938
939 pub fn mark_committed(&self, txn: &mut Transaction) {
941 txn.state = TxnState::Committed;
942 txn.commit_ts = Some(self.get_commit_ts());
943
944 let mut stats = self.stats.write();
945 stats.active_count = stats.active_count.saturating_sub(1);
946 stats.committed_count += 1;
947 }
948
949 pub fn mark_aborted(&self, txn: &mut Transaction) {
951 txn.state = TxnState::Aborted;
952
953 let mut stats = self.stats.write();
954 stats.active_count = stats.active_count.saturating_sub(1);
955 stats.aborted_count += 1;
956 }
957
958 pub fn mark_conflict_abort(&self, txn: &mut Transaction) {
960 self.mark_aborted(txn);
961
962 let mut stats = self.stats.write();
963 stats.conflict_aborts += 1;
964 }
965
966 pub fn oldest_active_ts(&self) -> u64 {
968 self.committed_watermark.load(Ordering::SeqCst)
969 }
970
971 pub fn advance_watermark(&self, new_watermark: u64) {
973 self.committed_watermark
974 .fetch_max(new_watermark, Ordering::SeqCst);
975 }
976
977 pub fn stats(&self) -> TxnStats {
979 self.stats.read().clone()
980 }
981}
982
983impl Default for TransactionManager {
984 fn default() -> Self {
985 Self::new()
986 }
987}
988
989#[cfg(test)]
990mod tests {
991 use super::*;
992
993 #[test]
994 fn test_transaction_lifecycle() {
995 let mgr = TransactionManager::new();
996
997 let mut txn = mgr.begin();
998 assert_eq!(txn.state, TxnState::Active);
999 assert!(txn.is_read_only());
1000
1001 txn.put("users", vec![1], vec![2, 3, 4]);
1002 assert!(!txn.is_read_only());
1003
1004 mgr.mark_committed(&mut txn);
1005 assert_eq!(txn.state, TxnState::Committed);
1006 assert!(txn.commit_ts.is_some());
1007 }
1008
1009 #[test]
1010 fn test_read_your_writes() {
1011 let mgr = TransactionManager::new();
1012 let mut txn = mgr.begin();
1013
1014 txn.put("users", vec![1], vec![10, 20]);
1015 txn.put("users", vec![1], vec![30, 40]); let local = txn.get_local("users", &[1]);
1018 assert!(local.is_some());
1019 assert_eq!(local.unwrap().value, Some(vec![30, 40]));
1020 }
1021
1022 #[test]
1023 fn test_wal_entry_serialization() {
1024 let mut entry = TxnWalEntry::new_data(
1025 42,
1026 1234567890,
1027 "users".to_string(),
1028 vec![1, 2, 3],
1029 Some(vec![4, 5, 6]),
1030 );
1031 entry.compute_checksum();
1032
1033 let bytes = entry.to_bytes();
1034 let parsed = TxnWalEntry::from_bytes(&bytes).unwrap();
1035
1036 assert_eq!(parsed.txn_id, 42);
1037 assert_eq!(parsed.timestamp_us, 1234567890);
1038 assert_eq!(parsed.table, Some("users".to_string()));
1039 assert_eq!(parsed.key, Some(vec![1, 2, 3]));
1040 assert_eq!(parsed.value, Some(vec![4, 5, 6]));
1041 assert!(parsed.verify_checksum());
1042 }
1043
1044 #[test]
1045 fn test_wal_entry_aries_roundtrip() {
1046 let mut entry = TxnWalEntry::new_aries_data(
1048 99,
1049 9999999,
1050 "orders".to_string(),
1051 vec![10, 20],
1052 Some(vec![30, 40, 50]),
1053 42, Some(100), Some(vec![0xDE, 0xAD]), );
1057 entry.lsn = 200;
1058 entry.undo_next_lsn = Some(50);
1059 entry.compute_checksum();
1060
1061 let bytes = entry.to_bytes();
1062 let parsed = TxnWalEntry::from_bytes(&bytes).unwrap();
1063
1064 assert_eq!(parsed.txn_id, 99);
1065 assert_eq!(parsed.lsn, 200);
1066 assert_eq!(parsed.prev_lsn, Some(100));
1067 assert_eq!(parsed.page_id, Some(42));
1068 assert_eq!(parsed.undo_info, Some(vec![0xDE, 0xAD]));
1069 assert_eq!(parsed.undo_next_lsn, Some(50));
1070 assert_eq!(parsed.key, Some(vec![10, 20]));
1071 assert_eq!(parsed.value, Some(vec![30, 40, 50]));
1072 assert!(parsed.verify_checksum());
1073 }
1074
1075 #[test]
1076 fn test_wal_entry_clr_roundtrip() {
1077 let mut entry = TxnWalEntry::new_clr(
1078 77,
1079 5555555,
1080 "inventory".to_string(),
1081 vec![1],
1082 Some(vec![2]),
1083 10, 300, 250, );
1087 entry.lsn = 400;
1088 entry.compute_checksum();
1089
1090 let bytes = entry.to_bytes();
1091 let parsed = TxnWalEntry::from_bytes(&bytes).unwrap();
1092
1093 assert_eq!(parsed.record_type, WalRecordType::CompensationLogRecord);
1094 assert_eq!(parsed.lsn, 400);
1095 assert_eq!(parsed.prev_lsn, Some(300));
1096 assert_eq!(parsed.page_id, Some(10));
1097 assert_eq!(parsed.undo_next_lsn, Some(250));
1098 assert!(parsed.undo_info.is_none()); assert!(parsed.verify_checksum());
1100 }
1101
1102 #[test]
1103 fn test_wal_entry_none_aries_fields_roundtrip() {
1104 let mut entry = TxnWalEntry::new_begin(1, 100);
1106 entry.compute_checksum();
1107
1108 let bytes = entry.to_bytes();
1109 let parsed = TxnWalEntry::from_bytes(&bytes).unwrap();
1110
1111 assert_eq!(parsed.lsn, 0);
1112 assert_eq!(parsed.prev_lsn, None);
1113 assert_eq!(parsed.page_id, None);
1114 assert_eq!(parsed.undo_info, None);
1115 assert_eq!(parsed.undo_next_lsn, None);
1116 assert!(parsed.verify_checksum());
1117 }
1118
1119 #[test]
1120 fn test_transaction_stats() {
1121 let mgr = TransactionManager::new();
1122
1123 let mut txn1 = mgr.begin();
1124 let mut txn2 = mgr.begin();
1125
1126 assert_eq!(mgr.stats().active_count, 2);
1127
1128 mgr.mark_committed(&mut txn1);
1129 assert_eq!(mgr.stats().committed_count, 1);
1130
1131 mgr.mark_aborted(&mut txn2);
1132 assert_eq!(mgr.stats().aborted_count, 1);
1133 assert_eq!(mgr.stats().active_count, 0);
1134 }
1135
1136 #[test]
1137 fn test_wal_entry_error_too_short() {
1138 let short_data = vec![0u8; 10];
1140 let result = TxnWalEntry::from_bytes(&short_data);
1141 assert!(result.is_err());
1142 assert!(result.unwrap_err().contains("too short"));
1143 }
1144
1145 #[test]
1146 fn test_wal_entry_error_invalid_record_type() {
1147 let mut data = vec![0u8; 30];
1149 data[0] = 255; let result = TxnWalEntry::from_bytes(&data);
1151 assert!(result.is_err());
1152 assert!(result.unwrap_err().contains("Invalid WAL record type"));
1153 }
1154
1155 #[test]
1156 fn test_wal_entry_error_truncated_key() {
1157 let mut entry =
1159 TxnWalEntry::new_data(1, 100, "test".to_string(), vec![1, 2], Some(vec![3, 4]));
1160 entry.compute_checksum();
1161 let mut bytes = entry.to_bytes();
1162
1163 let huge_len: u32 = 10000;
1165 bytes[17..21].copy_from_slice(&huge_len.to_le_bytes());
1166
1167 let result = TxnWalEntry::from_bytes(&bytes);
1168 assert!(result.is_err());
1169 assert!(result.unwrap_err().contains("truncated at key"));
1170 }
1171
1172 #[test]
1173 fn test_wal_entry_error_corrupted_checksum() {
1174 let mut entry = TxnWalEntry::new_data(
1175 42,
1176 1234567890,
1177 "users".to_string(),
1178 vec![1, 2, 3],
1179 Some(vec![4, 5, 6]),
1180 );
1181 entry.compute_checksum();
1182
1183 let mut bytes = entry.to_bytes();
1184 let len = bytes.len();
1186 bytes[len - 1] ^= 0xFF; let result = TxnWalEntry::from_bytes(&bytes);
1189 assert!(result.is_err());
1190 assert!(result.unwrap_err().contains("checksum mismatch"));
1191 }
1192
1193 #[test]
1194 fn test_wal_entry_error_invalid_utf8_table() {
1195 let mut entry = TxnWalEntry::new_data(1, 100, "test".to_string(), vec![1], Some(vec![2]));
1196 entry.compute_checksum();
1197 let mut bytes = entry.to_bytes();
1198
1199 let table_start = 17 + 4 + 1 + 4 + 1 + 4;
1202 bytes[table_start] = 0xFF; let result = TxnWalEntry::from_bytes(&bytes);
1205 assert!(result.is_err());
1207 }
1208}