1use super::error::BinaryError;
7use lnmp_core::{FieldId, LnmpRecord, LnmpValue};
8
9#[repr(u8)]
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum DeltaOperation {
13 SetField = 0x01,
15 DeleteField = 0x02,
17 UpdateField = 0x03,
19 MergeRecord = 0x04,
21}
22
23impl DeltaOperation {
24 pub fn from_u8(byte: u8) -> Result<Self, DeltaError> {
26 match byte {
27 0x01 => Ok(DeltaOperation::SetField),
28 0x02 => Ok(DeltaOperation::DeleteField),
29 0x03 => Ok(DeltaOperation::UpdateField),
30 0x04 => Ok(DeltaOperation::MergeRecord),
31 _ => Err(DeltaError::InvalidOperation { op_code: byte }),
32 }
33 }
34
35 pub fn to_u8(self) -> u8 {
37 self as u8
38 }
39}
40
41pub const DELTA_TAG: u8 = 0xB0;
43
44#[derive(Debug, Clone, PartialEq)]
46pub struct DeltaOp {
47 pub target_fid: FieldId,
49 pub operation: DeltaOperation,
51 pub payload: Vec<u8>,
53}
54
55impl DeltaOp {
56 pub fn new(target_fid: FieldId, operation: DeltaOperation, payload: Vec<u8>) -> Self {
58 Self {
59 target_fid,
60 operation,
61 payload,
62 }
63 }
64}
65
66#[derive(Debug, Clone)]
68pub struct DeltaConfig {
69 pub enable_delta: bool,
71 pub track_changes: bool,
73}
74
75impl DeltaConfig {
76 pub fn new() -> Self {
78 Self {
79 enable_delta: false,
80 track_changes: false,
81 }
82 }
83
84 pub fn with_enable_delta(mut self, enable: bool) -> Self {
86 self.enable_delta = enable;
87 self
88 }
89
90 pub fn with_track_changes(mut self, track: bool) -> Self {
92 self.track_changes = track;
93 self
94 }
95}
96
97impl Default for DeltaConfig {
98 fn default() -> Self {
99 Self::new()
100 }
101}
102
103#[derive(Debug, Clone, Default)]
105pub struct DeltaApplyContext {
106 pub metadata_base: Option<u64>,
108 pub required_base: Option<u64>,
110 pub algorithm: Option<u8>,
112 pub compression: Option<u8>,
114}
115
116#[derive(Debug, Clone, PartialEq)]
118pub enum DeltaError {
119 InvalidTargetFid {
121 fid: FieldId,
123 },
124 InvalidOperation {
126 op_code: u8,
128 },
129 MergeConflict {
131 fid: FieldId,
133 reason: String,
135 },
136 DeltaApplicationFailed {
138 reason: String,
140 },
141 BinaryError {
143 source: BinaryError,
145 },
146}
147
148impl std::fmt::Display for DeltaError {
149 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150 match self {
151 DeltaError::InvalidTargetFid { fid } => {
152 write!(f, "Invalid target FID: {}", fid)
153 }
154 DeltaError::InvalidOperation { op_code } => {
155 write!(f, "Invalid operation code: 0x{:02X}", op_code)
156 }
157 DeltaError::MergeConflict { fid, reason } => {
158 write!(f, "Merge conflict at FID {}: {}", fid, reason)
159 }
160 DeltaError::DeltaApplicationFailed { reason } => {
161 write!(f, "Delta application failed: {}", reason)
162 }
163 DeltaError::BinaryError { source } => {
164 write!(f, "Binary error: {}", source)
165 }
166 }
167 }
168}
169
170impl std::error::Error for DeltaError {}
171
172impl From<BinaryError> for DeltaError {
173 fn from(err: BinaryError) -> Self {
174 DeltaError::BinaryError { source: err }
175 }
176}
177
178pub struct DeltaEncoder {
180 config: DeltaConfig,
181}
182
183impl DeltaEncoder {
184 pub fn new() -> Self {
186 Self {
187 config: DeltaConfig::default(),
188 }
189 }
190
191 pub fn with_config(config: DeltaConfig) -> Self {
193 Self { config }
194 }
195
196 pub fn compute_delta(
209 &self,
210 old: &LnmpRecord,
211 new: &LnmpRecord,
212 ) -> Result<Vec<DeltaOp>, DeltaError> {
213 if !self.config.enable_delta {
216 return Err(DeltaError::DeltaApplicationFailed {
217 reason: "Delta is disabled in configuration".to_string(),
218 });
219 }
220
221 self.diff_records(old, new)
222 }
223
224 fn diff_records(&self, old: &LnmpRecord, new: &LnmpRecord) -> Result<Vec<DeltaOp>, DeltaError> {
226 use std::collections::HashSet;
227
228 let mut ops = Vec::new();
229
230 let old_fids: HashSet<FieldId> = old.fields().iter().map(|f| f.fid).collect();
232 let new_fids: HashSet<FieldId> = new.fields().iter().map(|f| f.fid).collect();
233
234 let mut all_fids: Vec<FieldId> = old_fids.union(&new_fids).copied().collect();
236 all_fids.sort_unstable();
237
238 for fid in all_fids {
239 let old_field = old.get_field(fid);
240 let new_field = new.get_field(fid);
241
242 match (old_field, new_field) {
243 (None, Some(new_f)) => {
244 let payload = self.encode_value(&new_f.value)?;
246 ops.push(DeltaOp::new(fid, DeltaOperation::SetField, payload));
247 }
248 (Some(_), None) => {
249 ops.push(DeltaOp::new(fid, DeltaOperation::DeleteField, vec![]));
251 }
252 (Some(old_f), Some(new_f)) => {
253 if old_f.value != new_f.value {
255 match (&old_f.value, &new_f.value) {
257 (
258 LnmpValue::NestedRecord(old_rec),
259 LnmpValue::NestedRecord(new_rec),
260 ) => {
261 let nested_ops = self.diff_records(old_rec, new_rec)?;
263 let payload = self.encode_nested_ops(&nested_ops)?;
264 ops.push(DeltaOp::new(fid, DeltaOperation::MergeRecord, payload));
265 }
266 _ => {
267 let payload = self.encode_value(&new_f.value)?;
269 ops.push(DeltaOp::new(fid, DeltaOperation::UpdateField, payload));
270 }
271 }
272 }
273 }
275 (None, None) => {
276 unreachable!()
278 }
279 }
280 }
281
282 Ok(ops)
283 }
284
285 fn encode_value(&self, value: &LnmpValue) -> Result<Vec<u8>, DeltaError> {
287 use super::entry::BinaryEntry;
288 use super::types::BinaryValue;
289
290 let binary_value = BinaryValue::from_lnmp_value(value)?;
292
293 let entry = BinaryEntry::new(0, binary_value);
295
296 let full_encoding = entry.encode();
298
299 if full_encoding.len() >= 2 {
301 Ok(full_encoding[2..].to_vec())
302 } else {
303 Err(DeltaError::DeltaApplicationFailed {
304 reason: "Invalid value encoding".to_string(),
305 })
306 }
307 }
308
309 fn encode_nested_ops(&self, ops: &[DeltaOp]) -> Result<Vec<u8>, DeltaError> {
311 use super::varint;
312
313 let mut result = Vec::new();
314
315 let count_bytes = varint::encode(ops.len() as i64);
317 result.extend_from_slice(&count_bytes);
318
319 for op in ops {
321 let fid_bytes = varint::encode(op.target_fid as i64);
323 result.extend_from_slice(&fid_bytes);
324
325 result.push(op.operation.to_u8());
327
328 let payload_len_bytes = varint::encode(op.payload.len() as i64);
330 result.extend_from_slice(&payload_len_bytes);
331
332 result.extend_from_slice(&op.payload);
334 }
335
336 Ok(result)
337 }
338
339 pub fn encode_delta(&self, ops: &[DeltaOp]) -> Result<Vec<u8>, DeltaError> {
349 use super::varint;
350
351 let mut result = Vec::new();
352
353 result.push(DELTA_TAG);
355
356 let count_bytes = varint::encode(ops.len() as i64);
358 result.extend_from_slice(&count_bytes);
359
360 for op in ops {
362 let fid_bytes = varint::encode(op.target_fid as i64);
364 result.extend_from_slice(&fid_bytes);
365
366 result.push(op.operation.to_u8());
368
369 let payload_len_bytes = varint::encode(op.payload.len() as i64);
371 result.extend_from_slice(&payload_len_bytes);
372
373 result.extend_from_slice(&op.payload);
375 }
376
377 Ok(result)
378 }
379}
380
381fn validate_apply_context(ctx: &DeltaApplyContext) -> Result<(), DeltaError> {
382 if let Some(alg) = ctx.algorithm {
383 if alg > 0x01 {
384 return Err(DeltaError::DeltaApplicationFailed {
385 reason: format!("unsupported delta algorithm 0x{alg:02X}"),
386 });
387 }
388 }
389 if let Some(comp) = ctx.compression {
390 if comp > 0x01 {
391 return Err(DeltaError::DeltaApplicationFailed {
392 reason: format!("unsupported delta compression 0x{comp:02X}"),
393 });
394 }
395 }
396 if let Some(meta_base) = ctx.metadata_base {
397 if meta_base == 0 {
398 return Err(DeltaError::DeltaApplicationFailed {
399 reason: "delta metadata base snapshot is zero".to_string(),
400 });
401 }
402 if let Some(required) = ctx.required_base {
403 if meta_base != required {
404 return Err(DeltaError::DeltaApplicationFailed {
405 reason: format!(
406 "delta base snapshot mismatch (metadata={meta_base}, required={required})"
407 ),
408 });
409 }
410 }
411 }
412 Ok(())
413}
414impl Default for DeltaEncoder {
415 fn default() -> Self {
416 Self::new()
417 }
418}
419
420pub struct DeltaDecoder {
422 config: DeltaConfig,
423}
424
425impl DeltaDecoder {
426 pub fn new() -> Self {
428 Self {
429 config: DeltaConfig::default(),
430 }
431 }
432
433 pub fn with_config(config: DeltaConfig) -> Self {
435 Self { config }
436 }
437
438 pub fn decode_delta(&self, bytes: &[u8]) -> Result<Vec<DeltaOp>, DeltaError> {
452 if !self.config.enable_delta {
453 return Err(DeltaError::DeltaApplicationFailed {
454 reason: "Delta is disabled in configuration".to_string(),
455 });
456 }
457 use super::varint;
458
459 if bytes.is_empty() {
460 return Err(DeltaError::DeltaApplicationFailed {
461 reason: "Empty delta packet".to_string(),
462 });
463 }
464
465 let mut offset = 0;
466
467 if bytes[offset] != DELTA_TAG {
469 return Err(DeltaError::DeltaApplicationFailed {
470 reason: format!(
471 "Invalid delta tag: expected 0xB0, found 0x{:02X}",
472 bytes[offset]
473 ),
474 });
475 }
476 offset += 1;
477
478 let (count, consumed) = varint::decode(&bytes[offset..])?;
480 offset += consumed;
481
482 if count < 0 {
483 return Err(DeltaError::DeltaApplicationFailed {
484 reason: format!("Negative operation count: {}", count),
485 });
486 }
487
488 let count = count as usize;
489 let mut ops = Vec::with_capacity(count);
490
491 for _ in 0..count {
493 let (fid, consumed) = varint::decode(&bytes[offset..])?;
495 offset += consumed;
496
497 if fid < 0 || fid > u16::MAX as i64 {
498 return Err(DeltaError::InvalidTargetFid { fid: fid as u16 });
499 }
500 let target_fid = fid as u16;
501
502 if offset >= bytes.len() {
504 return Err(DeltaError::DeltaApplicationFailed {
505 reason: "Unexpected end of delta packet".to_string(),
506 })?;
507 }
508 let operation = DeltaOperation::from_u8(bytes[offset])?;
509 offset += 1;
510
511 let (payload_len, consumed) = varint::decode(&bytes[offset..])?;
513 offset += consumed;
514
515 if payload_len < 0 {
516 return Err(DeltaError::DeltaApplicationFailed {
517 reason: format!("Negative payload length: {}", payload_len),
518 });
519 }
520
521 let payload_len = payload_len as usize;
522 if offset + payload_len > bytes.len() {
523 return Err(DeltaError::DeltaApplicationFailed {
524 reason: "Payload length exceeds available data".to_string(),
525 })?;
526 }
527
528 let payload = bytes[offset..offset + payload_len].to_vec();
530 offset += payload_len;
531
532 ops.push(DeltaOp::new(target_fid, operation, payload));
533 }
534
535 Ok(ops)
536 }
537
538 pub fn apply_delta(&self, base: &mut LnmpRecord, ops: &[DeltaOp]) -> Result<(), DeltaError> {
540 self.apply_delta_with_context(base, ops, &DeltaApplyContext::default())
541 }
542
543 pub fn apply_delta_with_context(
552 &self,
553 base: &mut LnmpRecord,
554 ops: &[DeltaOp],
555 ctx: &DeltaApplyContext,
556 ) -> Result<(), DeltaError> {
557 if !self.config.enable_delta {
558 return Err(DeltaError::DeltaApplicationFailed {
559 reason: "Delta is disabled in configuration".to_string(),
560 });
561 }
562
563 validate_apply_context(ctx)?;
564 use lnmp_core::LnmpField;
565
566 for op in ops {
567 match op.operation {
569 DeltaOperation::UpdateField | DeltaOperation::MergeRecord => {
570 if base.get_field(op.target_fid).is_none() {
571 return Err(DeltaError::InvalidTargetFid { fid: op.target_fid });
572 }
573 }
574 _ => {}
575 }
576
577 match op.operation {
578 DeltaOperation::SetField => {
579 let value = self.decode_value(&op.payload)?;
581 base.remove_field(op.target_fid);
583 base.add_field(LnmpField {
584 fid: op.target_fid,
585 value,
586 });
587 }
588 DeltaOperation::DeleteField => {
589 base.remove_field(op.target_fid);
591 }
592 DeltaOperation::UpdateField => {
593 let value = self.decode_value(&op.payload)?;
595 base.remove_field(op.target_fid);
597 base.add_field(LnmpField {
598 fid: op.target_fid,
599 value,
600 });
601 }
602 DeltaOperation::MergeRecord => {
603 let existing_field = base
605 .get_field(op.target_fid)
606 .ok_or(DeltaError::InvalidTargetFid { fid: op.target_fid })?;
607
608 match &existing_field.value {
609 LnmpValue::NestedRecord(existing_rec) => {
610 let nested_ops = self.decode_nested_ops(&op.payload)?;
612
613 let mut updated_rec = (**existing_rec).clone();
615 self.apply_delta(&mut updated_rec, &nested_ops)?;
616
617 base.remove_field(op.target_fid);
619 base.add_field(LnmpField {
620 fid: op.target_fid,
621 value: LnmpValue::NestedRecord(Box::new(updated_rec)),
622 });
623 }
624 _ => {
625 return Err(DeltaError::MergeConflict {
626 fid: op.target_fid,
627 reason: "Target field is not a nested record".to_string(),
628 });
629 }
630 }
631 }
632 }
633 }
634
635 Ok(())
636 }
637
638 fn decode_value(&self, payload: &[u8]) -> Result<LnmpValue, DeltaError> {
640 use super::entry::BinaryEntry;
641
642 if payload.is_empty() {
643 return Err(DeltaError::DeltaApplicationFailed {
644 reason: "Empty value payload".to_string(),
645 });
646 }
647
648 let mut entry_bytes = vec![0x00, 0x00]; entry_bytes.extend_from_slice(payload);
653
654 let (entry, _) = BinaryEntry::decode(&entry_bytes)?;
655 Ok(entry.value.to_lnmp_value())
656 }
657
658 fn decode_nested_ops(&self, payload: &[u8]) -> Result<Vec<DeltaOp>, DeltaError> {
660 use super::varint;
661
662 let mut offset = 0;
663
664 let (count, consumed) = varint::decode(&payload[offset..])?;
666 offset += consumed;
667
668 if count < 0 {
669 return Err(DeltaError::DeltaApplicationFailed {
670 reason: format!("Negative nested operation count: {}", count),
671 });
672 }
673
674 let count = count as usize;
675 let mut ops = Vec::with_capacity(count);
676
677 for _ in 0..count {
679 let (fid, consumed) = varint::decode(&payload[offset..])?;
681 offset += consumed;
682
683 if fid < 0 || fid > u16::MAX as i64 {
684 return Err(DeltaError::InvalidTargetFid { fid: fid as u16 });
685 }
686 let target_fid = fid as u16;
687
688 if offset >= payload.len() {
690 return Err(DeltaError::DeltaApplicationFailed {
691 reason: "Unexpected end of nested operations".to_string(),
692 })?;
693 }
694 let operation = DeltaOperation::from_u8(payload[offset])?;
695 offset += 1;
696
697 let (payload_len, consumed) = varint::decode(&payload[offset..])?;
699 offset += consumed;
700
701 if payload_len < 0 {
702 return Err(DeltaError::DeltaApplicationFailed {
703 reason: format!("Negative nested payload length: {}", payload_len),
704 });
705 }
706
707 let payload_len = payload_len as usize;
708 if offset + payload_len > payload.len() {
709 return Err(DeltaError::DeltaApplicationFailed {
710 reason: "Nested payload length exceeds available data".to_string(),
711 })?;
712 }
713
714 let op_payload = payload[offset..offset + payload_len].to_vec();
716 offset += payload_len;
717
718 ops.push(DeltaOp::new(target_fid, operation, op_payload));
719 }
720
721 Ok(ops)
722 }
723}
724
725impl Default for DeltaDecoder {
726 fn default() -> Self {
727 Self::new()
728 }
729}
730
731#[cfg(test)]
732mod tests {
733 #![allow(clippy::approx_constant)]
734
735 use super::*;
736
737 #[test]
738 fn test_delta_operation_from_u8() {
739 assert_eq!(
740 DeltaOperation::from_u8(0x01).unwrap(),
741 DeltaOperation::SetField
742 );
743 assert_eq!(
744 DeltaOperation::from_u8(0x02).unwrap(),
745 DeltaOperation::DeleteField
746 );
747 assert_eq!(
748 DeltaOperation::from_u8(0x03).unwrap(),
749 DeltaOperation::UpdateField
750 );
751 assert_eq!(
752 DeltaOperation::from_u8(0x04).unwrap(),
753 DeltaOperation::MergeRecord
754 );
755 }
756
757 #[test]
758 fn test_delta_operation_from_u8_invalid() {
759 assert!(DeltaOperation::from_u8(0x00).is_err());
760 assert!(DeltaOperation::from_u8(0x05).is_err());
761 assert!(DeltaOperation::from_u8(0xFF).is_err());
762 }
763
764 #[test]
765 fn test_delta_operation_to_u8() {
766 assert_eq!(DeltaOperation::SetField.to_u8(), 0x01);
767 assert_eq!(DeltaOperation::DeleteField.to_u8(), 0x02);
768 assert_eq!(DeltaOperation::UpdateField.to_u8(), 0x03);
769 assert_eq!(DeltaOperation::MergeRecord.to_u8(), 0x04);
770 }
771
772 #[test]
773 fn test_delta_operation_round_trip() {
774 let ops = vec![
775 DeltaOperation::SetField,
776 DeltaOperation::DeleteField,
777 DeltaOperation::UpdateField,
778 DeltaOperation::MergeRecord,
779 ];
780
781 for op in ops {
782 let byte = op.to_u8();
783 let parsed = DeltaOperation::from_u8(byte).unwrap();
784 assert_eq!(parsed, op);
785 }
786 }
787
788 #[test]
789 fn test_delta_tag_constant() {
790 assert_eq!(DELTA_TAG, 0xB0);
791 }
792
793 #[test]
794 fn test_delta_op_new() {
795 let op = DeltaOp::new(12, DeltaOperation::SetField, vec![0x01, 0x02, 0x03]);
796 assert_eq!(op.target_fid, 12);
797 assert_eq!(op.operation, DeltaOperation::SetField);
798 assert_eq!(op.payload, vec![0x01, 0x02, 0x03]);
799 }
800
801 #[test]
802 fn test_delta_config_default() {
803 let config = DeltaConfig::new();
804 assert!(!config.enable_delta);
805 assert!(!config.track_changes);
806 }
807
808 #[test]
809 fn test_delta_config_with_enable_delta() {
810 let config = DeltaConfig::new().with_enable_delta(true);
811 assert!(config.enable_delta);
812 assert!(!config.track_changes);
813 }
814
815 #[test]
816 fn test_delta_config_with_track_changes() {
817 let config = DeltaConfig::new().with_track_changes(true);
818 assert!(!config.enable_delta);
819 assert!(config.track_changes);
820 }
821
822 #[test]
823 fn test_delta_config_builder() {
824 let config = DeltaConfig::new()
825 .with_enable_delta(true)
826 .with_track_changes(true);
827 assert!(config.enable_delta);
828 assert!(config.track_changes);
829 }
830
831 #[test]
832 fn test_compute_delta_with_enable_flag() {
833 use lnmp_core::{LnmpField, LnmpValue};
834
835 let mut base = LnmpRecord::new();
836 base.add_field(LnmpField {
837 fid: 1,
838 value: LnmpValue::Int(1),
839 });
840 base.add_field(LnmpField {
841 fid: 2,
842 value: LnmpValue::String("a".to_string()),
843 });
844
845 let mut updated = base.clone();
846 updated.remove_field(1);
847 updated.add_field(LnmpField {
848 fid: 1,
849 value: LnmpValue::Int(2),
850 });
851
852 let config = DeltaConfig::new().with_enable_delta(true);
853 let encoder = DeltaEncoder::with_config(config);
854 let ops = encoder.compute_delta(&base, &updated).unwrap();
855 assert_eq!(ops.len(), 1);
856 assert_eq!(ops[0].target_fid, 1);
857 assert_eq!(ops[0].operation, DeltaOperation::UpdateField);
858 }
859
860 #[test]
861 fn test_delta_error_display_invalid_target_fid() {
862 let err = DeltaError::InvalidTargetFid { fid: 999 };
863 let msg = format!("{}", err);
864 assert!(msg.contains("Invalid target FID"));
865 assert!(msg.contains("999"));
866 }
867
868 #[test]
869 fn test_delta_error_display_invalid_operation() {
870 let err = DeltaError::InvalidOperation { op_code: 0xFF };
871 let msg = format!("{}", err);
872 assert!(msg.contains("Invalid operation code"));
873 assert!(msg.contains("0xFF"));
874 }
875
876 #[test]
877 fn test_delta_error_display_merge_conflict() {
878 let err = DeltaError::MergeConflict {
879 fid: 42,
880 reason: "Type mismatch".to_string(),
881 };
882 let msg = format!("{}", err);
883 assert!(msg.contains("Merge conflict"));
884 assert!(msg.contains("42"));
885 assert!(msg.contains("Type mismatch"));
886 }
887
888 #[test]
889 fn test_delta_error_display_application_failed() {
890 let err = DeltaError::DeltaApplicationFailed {
891 reason: "Field not found".to_string(),
892 };
893 let msg = format!("{}", err);
894 assert!(msg.contains("Delta application failed"));
895 assert!(msg.contains("Field not found"));
896 }
897}