1use super::error::BinaryError;
7use lnmp_core::{FieldId, LnmpRecord, LnmpValue};
8
9#[repr(u8)]
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum DeltaOperation {
13 SetField = 0x01,
15 DeleteField = 0x02,
17 UpdateField = 0x03,
19 MergeRecord = 0x04,
21}
22
23impl DeltaOperation {
24 pub fn from_u8(byte: u8) -> Result<Self, DeltaError> {
26 match byte {
27 0x01 => Ok(DeltaOperation::SetField),
28 0x02 => Ok(DeltaOperation::DeleteField),
29 0x03 => Ok(DeltaOperation::UpdateField),
30 0x04 => Ok(DeltaOperation::MergeRecord),
31 _ => Err(DeltaError::InvalidOperation { op_code: byte }),
32 }
33 }
34
35 pub fn to_u8(self) -> u8 {
37 self as u8
38 }
39}
40
41pub const DELTA_TAG: u8 = 0xB0;
43
44#[derive(Debug, Clone, PartialEq)]
46pub struct DeltaOp {
47 pub target_fid: FieldId,
49 pub operation: DeltaOperation,
51 pub payload: Vec<u8>,
53}
54
55impl DeltaOp {
56 pub fn new(target_fid: FieldId, operation: DeltaOperation, payload: Vec<u8>) -> Self {
58 Self {
59 target_fid,
60 operation,
61 payload,
62 }
63 }
64}
65
66#[derive(Debug, Clone)]
68pub struct DeltaConfig {
69 pub enable_delta: bool,
71 pub track_changes: bool,
73}
74
75impl DeltaConfig {
76 pub fn new() -> Self {
78 Self {
79 enable_delta: false,
80 track_changes: false,
81 }
82 }
83
84 pub fn with_enable_delta(mut self, enable: bool) -> Self {
86 self.enable_delta = enable;
87 self
88 }
89
90 pub fn with_track_changes(mut self, track: bool) -> Self {
92 self.track_changes = track;
93 self
94 }
95}
96
97impl Default for DeltaConfig {
98 fn default() -> Self {
99 Self::new()
100 }
101}
102
103#[derive(Debug, Clone, PartialEq)]
105pub enum DeltaError {
106 InvalidTargetFid {
108 fid: FieldId,
110 },
111 InvalidOperation {
113 op_code: u8,
115 },
116 MergeConflict {
118 fid: FieldId,
120 reason: String,
122 },
123 DeltaApplicationFailed {
125 reason: String,
127 },
128 BinaryError {
130 source: BinaryError,
132 },
133}
134
135impl std::fmt::Display for DeltaError {
136 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137 match self {
138 DeltaError::InvalidTargetFid { fid } => {
139 write!(f, "Invalid target FID: {}", fid)
140 }
141 DeltaError::InvalidOperation { op_code } => {
142 write!(f, "Invalid operation code: 0x{:02X}", op_code)
143 }
144 DeltaError::MergeConflict { fid, reason } => {
145 write!(f, "Merge conflict at FID {}: {}", fid, reason)
146 }
147 DeltaError::DeltaApplicationFailed { reason } => {
148 write!(f, "Delta application failed: {}", reason)
149 }
150 DeltaError::BinaryError { source } => {
151 write!(f, "Binary error: {}", source)
152 }
153 }
154 }
155}
156
157impl std::error::Error for DeltaError {}
158
159impl From<BinaryError> for DeltaError {
160 fn from(err: BinaryError) -> Self {
161 DeltaError::BinaryError { source: err }
162 }
163}
164
165pub struct DeltaEncoder {
167 config: DeltaConfig,
168}
169
170impl DeltaEncoder {
171 pub fn new() -> Self {
173 Self {
174 config: DeltaConfig::default(),
175 }
176 }
177
178 pub fn with_config(config: DeltaConfig) -> Self {
180 Self { config }
181 }
182
183 pub fn compute_delta(
196 &self,
197 old: &LnmpRecord,
198 new: &LnmpRecord,
199 ) -> Result<Vec<DeltaOp>, DeltaError> {
200 if !self.config.enable_delta {
203 return Err(DeltaError::DeltaApplicationFailed {
204 reason: "Delta is disabled in configuration".to_string(),
205 });
206 }
207
208 self.diff_records(old, new)
209 }
210
211 fn diff_records(&self, old: &LnmpRecord, new: &LnmpRecord) -> Result<Vec<DeltaOp>, DeltaError> {
213 use std::collections::HashSet;
214
215 let mut ops = Vec::new();
216
217 let old_fids: HashSet<FieldId> = old.fields().iter().map(|f| f.fid).collect();
219 let new_fids: HashSet<FieldId> = new.fields().iter().map(|f| f.fid).collect();
220
221 let mut all_fids: Vec<FieldId> = old_fids.union(&new_fids).copied().collect();
223 all_fids.sort_unstable();
224
225 for fid in all_fids {
226 let old_field = old.get_field(fid);
227 let new_field = new.get_field(fid);
228
229 match (old_field, new_field) {
230 (None, Some(new_f)) => {
231 let payload = self.encode_value(&new_f.value)?;
233 ops.push(DeltaOp::new(fid, DeltaOperation::SetField, payload));
234 }
235 (Some(_), None) => {
236 ops.push(DeltaOp::new(fid, DeltaOperation::DeleteField, vec![]));
238 }
239 (Some(old_f), Some(new_f)) => {
240 if old_f.value != new_f.value {
242 match (&old_f.value, &new_f.value) {
244 (
245 LnmpValue::NestedRecord(old_rec),
246 LnmpValue::NestedRecord(new_rec),
247 ) => {
248 let nested_ops = self.diff_records(old_rec, new_rec)?;
250 let payload = self.encode_nested_ops(&nested_ops)?;
251 ops.push(DeltaOp::new(fid, DeltaOperation::MergeRecord, payload));
252 }
253 _ => {
254 let payload = self.encode_value(&new_f.value)?;
256 ops.push(DeltaOp::new(fid, DeltaOperation::UpdateField, payload));
257 }
258 }
259 }
260 }
262 (None, None) => {
263 unreachable!()
265 }
266 }
267 }
268
269 Ok(ops)
270 }
271
272 fn encode_value(&self, value: &LnmpValue) -> Result<Vec<u8>, DeltaError> {
274 use super::entry::BinaryEntry;
275 use super::types::BinaryValue;
276
277 let binary_value = BinaryValue::from_lnmp_value(value)?;
279
280 let entry = BinaryEntry::new(0, binary_value);
282
283 let full_encoding = entry.encode();
285
286 if full_encoding.len() >= 2 {
288 Ok(full_encoding[2..].to_vec())
289 } else {
290 Err(DeltaError::DeltaApplicationFailed {
291 reason: "Invalid value encoding".to_string(),
292 })
293 }
294 }
295
296 fn encode_nested_ops(&self, ops: &[DeltaOp]) -> Result<Vec<u8>, DeltaError> {
298 use super::varint;
299
300 let mut result = Vec::new();
301
302 let count_bytes = varint::encode(ops.len() as i64);
304 result.extend_from_slice(&count_bytes);
305
306 for op in ops {
308 let fid_bytes = varint::encode(op.target_fid as i64);
310 result.extend_from_slice(&fid_bytes);
311
312 result.push(op.operation.to_u8());
314
315 let payload_len_bytes = varint::encode(op.payload.len() as i64);
317 result.extend_from_slice(&payload_len_bytes);
318
319 result.extend_from_slice(&op.payload);
321 }
322
323 Ok(result)
324 }
325
326 pub fn encode_delta(&self, ops: &[DeltaOp]) -> Result<Vec<u8>, DeltaError> {
336 use super::varint;
337
338 let mut result = Vec::new();
339
340 result.push(DELTA_TAG);
342
343 let count_bytes = varint::encode(ops.len() as i64);
345 result.extend_from_slice(&count_bytes);
346
347 for op in ops {
349 let fid_bytes = varint::encode(op.target_fid as i64);
351 result.extend_from_slice(&fid_bytes);
352
353 result.push(op.operation.to_u8());
355
356 let payload_len_bytes = varint::encode(op.payload.len() as i64);
358 result.extend_from_slice(&payload_len_bytes);
359
360 result.extend_from_slice(&op.payload);
362 }
363
364 Ok(result)
365 }
366}
367
368impl Default for DeltaEncoder {
369 fn default() -> Self {
370 Self::new()
371 }
372}
373
374pub struct DeltaDecoder {
376 config: DeltaConfig,
377}
378
379impl DeltaDecoder {
380 pub fn new() -> Self {
382 Self {
383 config: DeltaConfig::default(),
384 }
385 }
386
387 pub fn with_config(config: DeltaConfig) -> Self {
389 Self { config }
390 }
391
392 pub fn decode_delta(&self, bytes: &[u8]) -> Result<Vec<DeltaOp>, DeltaError> {
406 if !self.config.enable_delta {
407 return Err(DeltaError::DeltaApplicationFailed {
408 reason: "Delta is disabled in configuration".to_string(),
409 });
410 }
411 use super::varint;
412
413 if bytes.is_empty() {
414 return Err(DeltaError::DeltaApplicationFailed {
415 reason: "Empty delta packet".to_string(),
416 });
417 }
418
419 let mut offset = 0;
420
421 if bytes[offset] != DELTA_TAG {
423 return Err(DeltaError::DeltaApplicationFailed {
424 reason: format!(
425 "Invalid delta tag: expected 0xB0, found 0x{:02X}",
426 bytes[offset]
427 ),
428 });
429 }
430 offset += 1;
431
432 let (count, consumed) = varint::decode(&bytes[offset..])?;
434 offset += consumed;
435
436 if count < 0 {
437 return Err(DeltaError::DeltaApplicationFailed {
438 reason: format!("Negative operation count: {}", count),
439 });
440 }
441
442 let count = count as usize;
443 let mut ops = Vec::with_capacity(count);
444
445 for _ in 0..count {
447 let (fid, consumed) = varint::decode(&bytes[offset..])?;
449 offset += consumed;
450
451 if fid < 0 || fid > u16::MAX as i64 {
452 return Err(DeltaError::InvalidTargetFid { fid: fid as u16 });
453 }
454 let target_fid = fid as u16;
455
456 if offset >= bytes.len() {
458 return Err(DeltaError::DeltaApplicationFailed {
459 reason: "Unexpected end of delta packet".to_string(),
460 })?;
461 }
462 let operation = DeltaOperation::from_u8(bytes[offset])?;
463 offset += 1;
464
465 let (payload_len, consumed) = varint::decode(&bytes[offset..])?;
467 offset += consumed;
468
469 if payload_len < 0 {
470 return Err(DeltaError::DeltaApplicationFailed {
471 reason: format!("Negative payload length: {}", payload_len),
472 });
473 }
474
475 let payload_len = payload_len as usize;
476 if offset + payload_len > bytes.len() {
477 return Err(DeltaError::DeltaApplicationFailed {
478 reason: "Payload length exceeds available data".to_string(),
479 })?;
480 }
481
482 let payload = bytes[offset..offset + payload_len].to_vec();
484 offset += payload_len;
485
486 ops.push(DeltaOp::new(target_fid, operation, payload));
487 }
488
489 Ok(ops)
490 }
491
492 pub fn apply_delta(&self, base: &mut LnmpRecord, ops: &[DeltaOp]) -> Result<(), DeltaError> {
506 if !self.config.enable_delta {
507 return Err(DeltaError::DeltaApplicationFailed {
508 reason: "Delta is disabled in configuration".to_string(),
509 });
510 }
511 use lnmp_core::LnmpField;
512
513 for op in ops {
514 match op.operation {
516 DeltaOperation::UpdateField | DeltaOperation::MergeRecord => {
517 if base.get_field(op.target_fid).is_none() {
518 return Err(DeltaError::InvalidTargetFid { fid: op.target_fid });
519 }
520 }
521 _ => {}
522 }
523
524 match op.operation {
525 DeltaOperation::SetField => {
526 let value = self.decode_value(&op.payload)?;
528 base.remove_field(op.target_fid);
530 base.add_field(LnmpField {
531 fid: op.target_fid,
532 value,
533 });
534 }
535 DeltaOperation::DeleteField => {
536 base.remove_field(op.target_fid);
538 }
539 DeltaOperation::UpdateField => {
540 let value = self.decode_value(&op.payload)?;
542 base.remove_field(op.target_fid);
544 base.add_field(LnmpField {
545 fid: op.target_fid,
546 value,
547 });
548 }
549 DeltaOperation::MergeRecord => {
550 let existing_field = base
552 .get_field(op.target_fid)
553 .ok_or(DeltaError::InvalidTargetFid { fid: op.target_fid })?;
554
555 match &existing_field.value {
556 LnmpValue::NestedRecord(existing_rec) => {
557 let nested_ops = self.decode_nested_ops(&op.payload)?;
559
560 let mut updated_rec = (**existing_rec).clone();
562 self.apply_delta(&mut updated_rec, &nested_ops)?;
563
564 base.remove_field(op.target_fid);
566 base.add_field(LnmpField {
567 fid: op.target_fid,
568 value: LnmpValue::NestedRecord(Box::new(updated_rec)),
569 });
570 }
571 _ => {
572 return Err(DeltaError::MergeConflict {
573 fid: op.target_fid,
574 reason: "Target field is not a nested record".to_string(),
575 });
576 }
577 }
578 }
579 }
580 }
581
582 Ok(())
583 }
584
585 fn decode_value(&self, payload: &[u8]) -> Result<LnmpValue, DeltaError> {
587 use super::entry::BinaryEntry;
588
589 if payload.is_empty() {
590 return Err(DeltaError::DeltaApplicationFailed {
591 reason: "Empty value payload".to_string(),
592 });
593 }
594
595 let mut entry_bytes = vec![0x00, 0x00]; entry_bytes.extend_from_slice(payload);
600
601 let (entry, _) = BinaryEntry::decode(&entry_bytes)?;
602 Ok(entry.value.to_lnmp_value())
603 }
604
605 fn decode_nested_ops(&self, payload: &[u8]) -> Result<Vec<DeltaOp>, DeltaError> {
607 use super::varint;
608
609 let mut offset = 0;
610
611 let (count, consumed) = varint::decode(&payload[offset..])?;
613 offset += consumed;
614
615 if count < 0 {
616 return Err(DeltaError::DeltaApplicationFailed {
617 reason: format!("Negative nested operation count: {}", count),
618 });
619 }
620
621 let count = count as usize;
622 let mut ops = Vec::with_capacity(count);
623
624 for _ in 0..count {
626 let (fid, consumed) = varint::decode(&payload[offset..])?;
628 offset += consumed;
629
630 if fid < 0 || fid > u16::MAX as i64 {
631 return Err(DeltaError::InvalidTargetFid { fid: fid as u16 });
632 }
633 let target_fid = fid as u16;
634
635 if offset >= payload.len() {
637 return Err(DeltaError::DeltaApplicationFailed {
638 reason: "Unexpected end of nested operations".to_string(),
639 })?;
640 }
641 let operation = DeltaOperation::from_u8(payload[offset])?;
642 offset += 1;
643
644 let (payload_len, consumed) = varint::decode(&payload[offset..])?;
646 offset += consumed;
647
648 if payload_len < 0 {
649 return Err(DeltaError::DeltaApplicationFailed {
650 reason: format!("Negative nested payload length: {}", payload_len),
651 });
652 }
653
654 let payload_len = payload_len as usize;
655 if offset + payload_len > payload.len() {
656 return Err(DeltaError::DeltaApplicationFailed {
657 reason: "Nested payload length exceeds available data".to_string(),
658 })?;
659 }
660
661 let op_payload = payload[offset..offset + payload_len].to_vec();
663 offset += payload_len;
664
665 ops.push(DeltaOp::new(target_fid, operation, op_payload));
666 }
667
668 Ok(ops)
669 }
670}
671
672impl Default for DeltaDecoder {
673 fn default() -> Self {
674 Self::new()
675 }
676}
677
678#[cfg(test)]
679mod tests {
680 #![allow(clippy::approx_constant)]
681
682 use super::*;
683
684 #[test]
685 fn test_delta_operation_from_u8() {
686 assert_eq!(
687 DeltaOperation::from_u8(0x01).unwrap(),
688 DeltaOperation::SetField
689 );
690 assert_eq!(
691 DeltaOperation::from_u8(0x02).unwrap(),
692 DeltaOperation::DeleteField
693 );
694 assert_eq!(
695 DeltaOperation::from_u8(0x03).unwrap(),
696 DeltaOperation::UpdateField
697 );
698 assert_eq!(
699 DeltaOperation::from_u8(0x04).unwrap(),
700 DeltaOperation::MergeRecord
701 );
702 }
703
704 #[test]
705 fn test_delta_operation_from_u8_invalid() {
706 assert!(DeltaOperation::from_u8(0x00).is_err());
707 assert!(DeltaOperation::from_u8(0x05).is_err());
708 assert!(DeltaOperation::from_u8(0xFF).is_err());
709 }
710
711 #[test]
712 fn test_delta_operation_to_u8() {
713 assert_eq!(DeltaOperation::SetField.to_u8(), 0x01);
714 assert_eq!(DeltaOperation::DeleteField.to_u8(), 0x02);
715 assert_eq!(DeltaOperation::UpdateField.to_u8(), 0x03);
716 assert_eq!(DeltaOperation::MergeRecord.to_u8(), 0x04);
717 }
718
719 #[test]
720 fn test_delta_operation_round_trip() {
721 let ops = vec![
722 DeltaOperation::SetField,
723 DeltaOperation::DeleteField,
724 DeltaOperation::UpdateField,
725 DeltaOperation::MergeRecord,
726 ];
727
728 for op in ops {
729 let byte = op.to_u8();
730 let parsed = DeltaOperation::from_u8(byte).unwrap();
731 assert_eq!(parsed, op);
732 }
733 }
734
735 #[test]
736 fn test_delta_tag_constant() {
737 assert_eq!(DELTA_TAG, 0xB0);
738 }
739
740 #[test]
741 fn test_delta_op_new() {
742 let op = DeltaOp::new(12, DeltaOperation::SetField, vec![0x01, 0x02, 0x03]);
743 assert_eq!(op.target_fid, 12);
744 assert_eq!(op.operation, DeltaOperation::SetField);
745 assert_eq!(op.payload, vec![0x01, 0x02, 0x03]);
746 }
747
748 #[test]
749 fn test_delta_config_default() {
750 let config = DeltaConfig::new();
751 assert!(!config.enable_delta);
752 assert!(!config.track_changes);
753 }
754
755 #[test]
756 fn test_delta_config_with_enable_delta() {
757 let config = DeltaConfig::new().with_enable_delta(true);
758 assert!(config.enable_delta);
759 assert!(!config.track_changes);
760 }
761
762 #[test]
763 fn test_delta_config_with_track_changes() {
764 let config = DeltaConfig::new().with_track_changes(true);
765 assert!(!config.enable_delta);
766 assert!(config.track_changes);
767 }
768
769 #[test]
770 fn test_delta_config_builder() {
771 let config = DeltaConfig::new()
772 .with_enable_delta(true)
773 .with_track_changes(true);
774 assert!(config.enable_delta);
775 assert!(config.track_changes);
776 }
777
778 #[test]
779 fn test_compute_delta_with_enable_flag() {
780 use lnmp_core::{LnmpField, LnmpValue};
781
782 let mut base = LnmpRecord::new();
783 base.add_field(LnmpField {
784 fid: 1,
785 value: LnmpValue::Int(1),
786 });
787 base.add_field(LnmpField {
788 fid: 2,
789 value: LnmpValue::String("a".to_string()),
790 });
791
792 let mut updated = base.clone();
793 updated.remove_field(1);
794 updated.add_field(LnmpField {
795 fid: 1,
796 value: LnmpValue::Int(2),
797 });
798
799 let config = DeltaConfig::new().with_enable_delta(true);
800 let encoder = DeltaEncoder::with_config(config);
801 let ops = encoder.compute_delta(&base, &updated).unwrap();
802 assert_eq!(ops.len(), 1);
803 assert_eq!(ops[0].target_fid, 1);
804 assert_eq!(ops[0].operation, DeltaOperation::UpdateField);
805 }
806
807 #[test]
808 fn test_delta_error_display_invalid_target_fid() {
809 let err = DeltaError::InvalidTargetFid { fid: 999 };
810 let msg = format!("{}", err);
811 assert!(msg.contains("Invalid target FID"));
812 assert!(msg.contains("999"));
813 }
814
815 #[test]
816 fn test_delta_error_display_invalid_operation() {
817 let err = DeltaError::InvalidOperation { op_code: 0xFF };
818 let msg = format!("{}", err);
819 assert!(msg.contains("Invalid operation code"));
820 assert!(msg.contains("0xFF"));
821 }
822
823 #[test]
824 fn test_delta_error_display_merge_conflict() {
825 let err = DeltaError::MergeConflict {
826 fid: 42,
827 reason: "Type mismatch".to_string(),
828 };
829 let msg = format!("{}", err);
830 assert!(msg.contains("Merge conflict"));
831 assert!(msg.contains("42"));
832 assert!(msg.contains("Type mismatch"));
833 }
834
835 #[test]
836 fn test_delta_error_display_application_failed() {
837 let err = DeltaError::DeltaApplicationFailed {
838 reason: "Field not found".to_string(),
839 };
840 let msg = format!("{}", err);
841 assert!(msg.contains("Delta application failed"));
842 assert!(msg.contains("Field not found"));
843 }
844}