1use chrono::Utc;
4use crc32fast::Hasher as Crc32Hasher;
5use serde::{Deserialize, Serialize};
6use tracing::trace;
7
8use crate::{Result, WalError};
9
10#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct FixedBytes32([u8; 32]);
13
14impl Serialize for FixedBytes32 {
15 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
16 where
17 S: serde::Serializer,
18 {
19 serializer.serialize_bytes(&self.0)
20 }
21}
22
23impl<'de> Deserialize<'de> for FixedBytes32 {
24 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
25 where
26 D: serde::Deserializer<'de>,
27 {
28 let bytes: &[u8] = serde::Deserialize::deserialize(deserializer)?;
29 let mut arr = [0u8; 32];
30 #[allow(clippy::indexing_slicing, reason = "safe slicing with min length")]
31 arr.copy_from_slice(&bytes[.. 32.min(bytes.len())]);
32 if bytes.len() < 32 {
33 #[allow(clippy::indexing_slicing, reason = "safe slicing for padding")]
35 arr[bytes.len() ..].fill(0);
36 }
37 Ok(Self(arr))
38 }
39}
40
41impl std::ops::Deref for FixedBytes32 {
42 type Target = [u8];
43
44 fn deref(&self) -> &Self::Target { &self.0 }
45}
46
47impl std::ops::DerefMut for FixedBytes32 {
48 fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 }
49}
50
51impl From<&[u8]> for FixedBytes32 {
52 fn from(bytes: &[u8]) -> Self {
53 let mut temp = bytes.to_vec();
54 let len = temp.len();
55 #[allow(
56 clippy::arithmetic_side_effects,
57 reason = "safe arithmetic for padding calculation"
58 )]
59 let padded_len = len.div_ceil(16) * 16;
60 temp.resize(padded_len, 0);
61 let mut arr = [0u8; 32];
62 let copy_len = temp.len().min(32);
63 #[allow(
64 clippy::indexing_slicing,
65 reason = "safe slicing with calculated lengths"
66 )]
67 arr[.. copy_len].copy_from_slice(&temp[.. copy_len]);
68 Self(arr)
69 }
70}
71
72#[derive(Debug, Clone, PartialEq, Eq)]
74pub struct FixedBytes256(Vec<u8>);
75
76impl Serialize for FixedBytes256 {
77 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
78 where
79 S: serde::Serializer,
80 {
81 serializer.serialize_bytes(&self.0)
82 }
83}
84
85impl<'de> Deserialize<'de> for FixedBytes256 {
86 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
87 where
88 D: serde::Deserializer<'de>,
89 {
90 let bytes: &[u8] = serde::Deserialize::deserialize(deserializer)?;
91 Ok(Self(bytes.to_vec()))
92 }
93}
94
95impl std::ops::Deref for FixedBytes256 {
96 type Target = [u8];
97
98 fn deref(&self) -> &Self::Target { &self.0 }
99}
100
101impl std::ops::DerefMut for FixedBytes256 {
102 fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 }
103}
104
105impl From<&[u8]> for FixedBytes256 {
106 fn from(bytes: &[u8]) -> Self {
107 let mut temp = bytes.to_vec();
108 let len = temp.len();
109 #[allow(
110 clippy::arithmetic_side_effects,
111 reason = "safe arithmetic for padding calculation"
112 )]
113 let padded_len = len.div_ceil(16) * 16;
114 if padded_len > 256 {
115 temp.truncate(256);
116 temp.resize(256, 0);
118 }
119 else {
120 temp.resize(padded_len, 0);
121 }
122 Self(temp)
123 }
124}
125
126#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
128pub enum EntryType {
129 Begin,
131 Insert,
133 Update,
135 Delete,
137 Commit,
139 Rollback,
141}
142
143#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
145pub struct LogEntry {
146 pub entry_type: EntryType,
148 pub transaction_id: FixedBytes32,
150 pub collection: FixedBytes256,
152 pub document_id: FixedBytes256,
154 pub timestamp: u64,
156 pub data: Option<String>,
158}
159
160impl LogEntry {
161 pub fn new(
200 entry_type: EntryType,
201 collection: String,
202 document_id: String,
203 data: Option<serde_json::Value>,
204 ) -> Self {
205 let transaction_id = cuid2::CuidConstructor::new().with_length(32).create_id();
206 let data_str = data.map(|v| v.to_string());
207 Self {
208 entry_type,
209 transaction_id: FixedBytes32::from(transaction_id.as_bytes()),
210 collection: FixedBytes256::from(collection.as_bytes()),
211 document_id: FixedBytes256::from(document_id.as_bytes()),
212 timestamp: Utc::now().timestamp_millis() as u64,
213 data: data_str,
214 }
215 }
216
217 pub fn to_bytes(&self) -> Result<Vec<u8>> {
246 let serialized =
247 postcard::to_stdvec(self).map_err(|e: postcard::Error| WalError::Serialization(e.to_string()))?;
248 let mut hasher = Crc32Hasher::new();
249 hasher.update(&serialized);
250 let checksum = hasher.finalize();
251
252 let mut bytes = Vec::new();
253 bytes.extend_from_slice(&serialized);
254 bytes.extend_from_slice(&checksum.to_le_bytes());
255
256 trace!(
257 "Serialized entry to {} bytes (entry_type: {:?})",
258 bytes.len(),
259 self.entry_type
260 );
261 Ok(bytes)
262 }
263
264 pub fn from_bytes(bytes: &[u8]) -> Result<Self> {
305 if bytes.len() < 4 {
306 return Err(WalError::InvalidEntry("Entry too short".to_owned()));
307 }
308
309 #[allow(
310 clippy::arithmetic_side_effects,
311 reason = "safe subtraction since len >= 4"
312 )]
313 let data_len = bytes.len() - 4;
314 let (data, checksum_bytes) = bytes.split_at(data_len);
315 let expected_checksum = u32::from_le_bytes(checksum_bytes.try_into().unwrap());
316
317 let mut hasher = Crc32Hasher::new();
318 hasher.update(data);
319 let actual_checksum = hasher.finalize();
320
321 if actual_checksum != expected_checksum {
322 return Err(WalError::ChecksumMismatch);
323 }
324
325 let entry: Self =
326 postcard::from_bytes(data).map_err(|e: postcard::Error| WalError::Serialization(e.to_string()))?;
327 trace!(
328 "Deserialized binary entry (entry_type: {:?})",
329 entry.entry_type
330 );
331 Ok(entry)
332 }
333
334 pub fn to_json(&self) -> Result<String> {
363 let json_value = serde_json::json!({
364 "entry_type": self.entry_type,
365 "transaction_id": self.transaction_id_str(),
366 "collection": self.collection_str(),
367 "document_id": self.document_id_str(),
368 "timestamp": self.timestamp,
369 "data": self.data
370 });
371 let json_str = serde_json::to_string(&json_value)
372 .map_err(|e| WalError::Serialization(format!("JSON serialization error: {}", e)))?;
373 trace!(
374 "Serialized entry to JSON (entry_type: {:?})",
375 self.entry_type
376 );
377 Ok(json_str)
378 }
379
380 pub fn from_json(json_str: &str) -> Result<Self> {
419 let json_value: serde_json::Value = serde_json::from_str(json_str)
420 .map_err(|e| WalError::Serialization(format!("JSON parsing error: {}", e)))?;
421
422 let entry_type = match json_value.get("entry_type") {
423 Some(v) => {
424 serde_json::from_value(v.clone())
425 .map_err(|e| WalError::Serialization(format!("Invalid entry_type: {}", e)))?
426 },
427 None => return Err(WalError::InvalidEntry("Missing entry_type".to_owned())),
428 };
429
430 let transaction_id = match json_value.get("transaction_id") {
431 Some(v) => {
432 v.as_str()
433 .ok_or_else(|| WalError::InvalidEntry("transaction_id must be string".to_owned()))?
434 },
435 None => return Err(WalError::InvalidEntry("Missing transaction_id".to_owned())),
436 };
437
438 let collection = match json_value.get("collection") {
439 Some(v) => {
440 v.as_str()
441 .ok_or_else(|| WalError::InvalidEntry("collection must be string".to_owned()))?
442 },
443 None => return Err(WalError::InvalidEntry("Missing collection".to_owned())),
444 };
445
446 let document_id = match json_value.get("document_id") {
447 Some(v) => {
448 v.as_str()
449 .ok_or_else(|| WalError::InvalidEntry("document_id must be string".to_owned()))?
450 },
451 None => return Err(WalError::InvalidEntry("Missing document_id".to_owned())),
452 };
453
454 let timestamp = match json_value.get("timestamp") {
455 Some(v) => {
456 v.as_u64()
457 .ok_or_else(|| WalError::InvalidEntry("timestamp must be number".to_owned()))?
458 },
459 None => return Err(WalError::InvalidEntry("Missing timestamp".to_owned())),
460 };
461
462 let data = json_value
463 .get("data")
464 .and_then(|v| v.as_str())
465 .map(|s| s.to_owned());
466
467 let entry = Self {
468 entry_type,
469 transaction_id: FixedBytes32::from(transaction_id.as_bytes()),
470 collection: FixedBytes256::from(collection.as_bytes()),
471 document_id: FixedBytes256::from(document_id.as_bytes()),
472 timestamp,
473 data,
474 };
475 trace!(
476 "Deserialized JSON entry (entry_type: {:?})",
477 entry.entry_type
478 );
479 Ok(entry)
480 }
481
482 pub fn data_as_value(&self) -> Result<Option<serde_json::Value>> {
510 match self.data.as_ref() {
511 Some(s) => {
512 let value: serde_json::Value =
513 serde_json::from_str(s).map_err(|e| WalError::Serialization(format!("Invalid JSON: {}", e)))?;
514 Ok(Some(value))
515 },
516 None => Ok(None),
517 }
518 }
519
520 pub fn transaction_id_str(&self) -> &str {
547 std::str::from_utf8(&self.transaction_id)
548 .unwrap()
549 .trim_end_matches('\0')
550 }
551
552 pub fn collection_str(&self) -> &str {
576 std::str::from_utf8(&self.collection)
577 .unwrap()
578 .trim_end_matches('\0')
579 }
580
581 pub fn document_id_str(&self) -> &str {
605 std::str::from_utf8(&self.document_id)
606 .unwrap()
607 .trim_end_matches('\0')
608 }
609}
610
611#[cfg(test)]
612mod tests {
613 use serde_json::json;
614
615 use super::*;
616
617 #[test]
619 fn test_fixed_bytes32_from_slice() {
620 let input = b"hello world" as &[u8];
621 let fixed = FixedBytes32::from(input);
622 assert_eq!(&fixed[.. 11], input);
623 }
624
625 #[test]
626 fn test_fixed_bytes32_from_slice_longer_than_32() {
627 let input = b"this is a very long string that exceeds 32 bytes in length" as &[u8];
628 let fixed = FixedBytes32::from(input);
629 assert_eq!(fixed.len(), 32);
630 assert_eq!(&fixed[.. 32], &input[.. 32]);
631 }
632
633 #[test]
634 fn test_fixed_bytes32_serialization() {
635 let input = b"test data" as &[u8];
636 let fixed = FixedBytes32::from(input);
637
638 let serialized = serde_json::to_string(&fixed).unwrap();
639 assert!(!serialized.is_empty());
641 }
642
643 #[test]
644 fn test_fixed_bytes32_equality() {
645 let bytes1 = FixedBytes32::from(b"same" as &[u8]);
646 let bytes2 = FixedBytes32::from(b"same" as &[u8]);
647 assert_eq!(bytes1, bytes2);
648
649 let bytes3 = FixedBytes32::from(b"different" as &[u8]);
650 assert_ne!(bytes1, bytes3);
651 }
652
653 #[test]
654 fn test_fixed_bytes32_clone() {
655 let original = FixedBytes32::from(b"test" as &[u8]);
656 let cloned = original.clone();
657 assert_eq!(original, cloned);
658 }
659
660 #[test]
661 fn test_fixed_bytes32_deref() {
662 let fixed = FixedBytes32::from(b"hello" as &[u8]);
663 let slice: &[u8] = &*fixed;
664 assert_eq!(&slice[.. 5], b"hello");
665 }
666
667 #[test]
668 fn test_fixed_bytes32_deref_mut() {
669 let mut fixed = FixedBytes32::from(b"hello" as &[u8]);
670 fixed[0] = b'H';
671 assert_eq!(fixed[0], b'H');
672 }
673
674 #[test]
676 fn test_fixed_bytes256_from_slice() {
677 let input = b"collection name" as &[u8];
678 let fixed = FixedBytes256::from(input);
679 assert_eq!(&fixed[.. 15], input);
680 }
681
682 #[test]
683 fn test_fixed_bytes256_serialization() {
684 let input = b"test_collection" as &[u8];
685 let fixed = FixedBytes256::from(input);
686
687 let serialized = serde_json::to_string(&fixed).unwrap();
688 assert!(!serialized.is_empty());
690 }
691
692 #[test]
693 fn test_fixed_bytes256_equality() {
694 let bytes1 = FixedBytes256::from(b"collection" as &[u8]);
695 let bytes2 = FixedBytes256::from(b"collection" as &[u8]);
696 assert_eq!(bytes1, bytes2);
697 }
698
699 #[test]
700 fn test_fixed_bytes256_clone() {
701 let original = FixedBytes256::from(b"document-id" as &[u8]);
702 let cloned = original.clone();
703 assert_eq!(original, cloned);
704 }
705
706 #[test]
707 fn test_fixed_bytes256_padding() {
708 let input = b"test" as &[u8];
709 let fixed = FixedBytes256::from(input);
710 assert_eq!(fixed.len() % 16, 0);
712 }
713
714 #[test]
716 fn test_entry_type_equality() {
717 assert_eq!(EntryType::Insert, EntryType::Insert);
718 assert_ne!(EntryType::Insert, EntryType::Delete);
719 }
720
721 #[test]
722 fn test_entry_type_clone() {
723 let original = EntryType::Update;
724 let cloned = original.clone();
725 assert_eq!(original, cloned);
726 }
727
728 #[test]
729 fn test_entry_type_serialization() {
730 let entry_types = vec![
731 EntryType::Begin,
732 EntryType::Insert,
733 EntryType::Update,
734 EntryType::Delete,
735 EntryType::Commit,
736 EntryType::Rollback,
737 ];
738
739 for entry_type in entry_types {
740 let serialized = serde_json::to_string(&entry_type).unwrap();
741 let deserialized: EntryType = serde_json::from_str(&serialized).unwrap();
742 assert_eq!(entry_type, deserialized);
743 }
744 }
745
746 #[test]
747 fn test_entry_type_debug() {
748 let debug_str = format!("{:?}", EntryType::Insert);
749 assert!(debug_str.contains("Insert"));
750 }
751
752 #[test]
754 fn test_log_entry_new_with_data() {
755 let entry = LogEntry::new(
756 EntryType::Insert,
757 "users".to_string(),
758 "user-123".to_string(),
759 Some(json!({"name": "Alice"})),
760 );
761
762 assert_eq!(entry.entry_type, EntryType::Insert);
763 assert!(entry.data.is_some());
764 assert!(entry.timestamp > 0);
765 }
766
767 #[test]
768 fn test_log_entry_new_without_data() {
769 let entry = LogEntry::new(
770 EntryType::Delete,
771 "users".to_string(),
772 "user-123".to_string(),
773 None,
774 );
775
776 assert_eq!(entry.entry_type, EntryType::Delete);
777 assert!(entry.data.is_none());
778 }
779
780 #[test]
781 fn test_log_entry_to_bytes() {
782 let entry = LogEntry::new(
783 EntryType::Insert,
784 "users".to_string(),
785 "user-123".to_string(),
786 Some(json!({"name": "Bob"})),
787 );
788
789 let bytes = entry.to_bytes().unwrap();
790 assert!(!bytes.is_empty());
791 assert!(bytes.len() > 4); }
793
794 #[test]
795 fn test_log_entry_from_bytes_roundtrip() {
796 let original = LogEntry::new(
797 EntryType::Update,
798 "orders".to_string(),
799 "order-456".to_string(),
800 Some(json!({"status": "shipped", "cost": 99.99})),
801 );
802
803 let bytes = original.to_bytes().unwrap();
804 let restored = LogEntry::from_bytes(&bytes).unwrap();
805
806 assert_eq!(original.entry_type, restored.entry_type);
807 assert_eq!(original.data, restored.data);
808 }
809
810 #[test]
811 fn test_log_entry_from_bytes_invalid_checksum() {
812 let entry = LogEntry::new(
813 EntryType::Insert,
814 "users".to_string(),
815 "user-1".to_string(),
816 None,
817 );
818
819 let mut bytes = entry.to_bytes().unwrap();
820 let last_idx = bytes.len() - 1;
822 bytes[last_idx] ^= 0xff;
823
824 let result = LogEntry::from_bytes(&bytes);
825 assert!(result.is_err());
826 }
827
828 #[test]
829 fn test_log_entry_from_bytes_truncated() {
830 let result = LogEntry::from_bytes(b"truncated");
831 assert!(result.is_err());
832 }
833
834 #[test]
835 fn test_log_entry_collection_str() {
836 let entry = LogEntry::new(
837 EntryType::Insert,
838 "my_collection".to_string(),
839 "doc-1".to_string(),
840 None,
841 );
842
843 assert_eq!(entry.collection_str(), "my_collection");
844 }
845
846 #[test]
847 fn test_log_entry_document_id_str() {
848 let entry = LogEntry::new(
849 EntryType::Insert,
850 "users".to_string(),
851 "user-abc-123".to_string(),
852 None,
853 );
854
855 assert_eq!(entry.document_id_str(), "user-abc-123");
856 }
857
858 #[test]
859 fn test_log_entry_collection_str_with_nulls() {
860 let entry = LogEntry::new(
861 EntryType::Insert,
862 "collection".to_string(),
863 "doc".to_string(),
864 None,
865 );
866 let collection_str = entry.collection_str();
868 assert_eq!(collection_str, "collection");
869 }
870
871 #[test]
872 fn test_log_entry_document_id_str_with_nulls() {
873 let entry = LogEntry::new(
874 EntryType::Delete,
875 "col".to_string(),
876 "document-id".to_string(),
877 None,
878 );
879 let document_id_str = entry.document_id_str();
880 assert_eq!(document_id_str, "document-id");
881 }
882
883 #[test]
884 fn test_log_entry_clone() {
885 let original = LogEntry::new(
886 EntryType::Insert,
887 "users".to_string(),
888 "user-1".to_string(),
889 Some(json!({"name": "Test"})),
890 );
891
892 let cloned = original.clone();
893 assert_eq!(original.entry_type, cloned.entry_type);
894 assert_eq!(original.data, cloned.data);
895 }
896
897 #[test]
898 fn test_log_entry_equality() {
899 let entry1 = LogEntry::new(
900 EntryType::Insert,
901 "users".to_string(),
902 "user-1".to_string(),
903 Some(json!({"name": "Alice"})),
904 );
905
906 let entry2 = entry1.clone();
907 assert_eq!(entry1, entry2);
908 }
909
910 #[test]
911 fn test_log_entry_various_entry_types() {
912 let entry_types = vec![
913 (EntryType::Begin, "test_col", "txn-1", None),
914 (EntryType::Insert, "users", "user-1", Some(json!({"id": 1}))),
915 (EntryType::Update, "users", "user-2", Some(json!({"id": 2}))),
916 (EntryType::Delete, "users", "user-3", None),
917 (EntryType::Commit, "test_col", "txn-2", None),
918 (EntryType::Rollback, "test_col", "txn-3", None),
919 ];
920
921 for (entry_type, col, doc, data) in entry_types {
922 let entry = LogEntry::new(entry_type, col.to_string(), doc.to_string(), data);
923 assert_eq!(entry.entry_type, entry_type);
924 assert_eq!(entry.collection_str(), col);
925 assert_eq!(entry.document_id_str(), doc);
926 }
927 }
928
929 #[test]
930 fn test_log_entry_postcard_roundtrip_with_json() {
931 let entry = LogEntry::new(
933 EntryType::Insert,
934 "users".to_string(),
935 "user-1".to_string(),
936 Some(json!({"name": "Dave", "age": 25})),
937 );
938
939 let bytes = postcard::to_stdvec(&entry).unwrap();
940 let restored: LogEntry = postcard::from_bytes(&bytes).unwrap();
941
942 assert_eq!(entry.entry_type, restored.entry_type);
943 assert_eq!(entry.data, restored.data);
944 assert_eq!(entry.collection_str(), restored.collection_str());
945 assert_eq!(entry.document_id_str(), restored.document_id_str());
946 }
947}