1#[cfg(not(feature = "std"))]
69use alloc::{boxed::Box, vec::Vec};
70
71#[cfg(feature = "std")]
72use std::sync::RwLock;
73
74#[cfg(not(feature = "std"))]
75use spin::RwLock;
76
77use core::any::Any;
78
79#[cfg(feature = "std")]
80use std::collections::HashMap;
81
82#[cfg(not(feature = "std"))]
83use hashbrown::HashMap;
84
85pub const APP_TYPE_MIN: u8 = 0xC0;
87
88pub const APP_TYPE_MAX: u8 = 0xCF;
90
91pub const APP_OP_BASE: u8 = 0x10;
94
95pub trait DocumentType: Clone + Send + Sync + 'static {
102 const TYPE_ID: u8;
104
105 const TYPE_NAME: &'static str;
107
108 fn identity(&self) -> (u32, u64);
113
114 fn encode(&self) -> Vec<u8>;
116
117 fn decode(data: &[u8]) -> Option<Self>
121 where
122 Self: Sized;
123
124 fn merge(&mut self, other: &Self) -> bool;
128
129 fn to_delta_op(&self) -> Option<AppOperation> {
134 None
135 }
136
137 fn apply_delta_op(&mut self, _op: &AppOperation) -> bool {
141 false
142 }
143}
144
145#[derive(Debug, Clone)]
149pub struct AppOperation {
150 pub type_id: u8,
152
153 pub op_code: u8,
155
156 pub source_node: u32,
158
159 pub timestamp: u64,
161
162 pub payload: Vec<u8>,
164}
165
166impl AppOperation {
167 pub fn new(type_id: u8, op_code: u8, source_node: u32, timestamp: u64) -> Self {
169 Self {
170 type_id,
171 op_code,
172 source_node,
173 timestamp,
174 payload: Vec::new(),
175 }
176 }
177
178 pub fn with_payload(mut self, payload: Vec<u8>) -> Self {
180 self.payload = payload;
181 self
182 }
183
184 pub fn is_app_op_type(op_type: u8) -> bool {
186 (APP_OP_BASE..APP_OP_BASE + 16).contains(&op_type)
187 }
188
189 pub fn op_type_byte(&self) -> u8 {
191 APP_OP_BASE + (self.type_id - APP_TYPE_MIN)
192 }
193
194 pub fn encode(&self) -> Vec<u8> {
206 let mut buf = Vec::with_capacity(16 + self.payload.len());
207
208 buf.push(self.op_type_byte());
209 buf.push(self.op_code);
210 buf.extend_from_slice(&self.source_node.to_le_bytes());
211 buf.extend_from_slice(&self.timestamp.to_le_bytes());
212 buf.extend_from_slice(&(self.payload.len() as u16).to_le_bytes());
213 buf.extend_from_slice(&self.payload);
214
215 buf
216 }
217
218 pub fn decode(data: &[u8]) -> Option<(Self, usize)> {
222 if data.len() < 16 {
224 return None;
225 }
226
227 let op_type = data[0];
228 if !Self::is_app_op_type(op_type) {
229 return None;
230 }
231
232 let type_id = APP_TYPE_MIN + (op_type - APP_OP_BASE);
233 let op_code = data[1];
234 let source_node = u32::from_le_bytes([data[2], data[3], data[4], data[5]]);
235 let timestamp = u64::from_le_bytes([
236 data[6], data[7], data[8], data[9], data[10], data[11], data[12], data[13],
237 ]);
238 let payload_len = u16::from_le_bytes([data[14], data[15]]) as usize;
239
240 if data.len() < 16 + payload_len {
241 return None;
242 }
243
244 let payload = data[16..16 + payload_len].to_vec();
245
246 Some((
247 Self {
248 type_id,
249 op_code,
250 source_node,
251 timestamp,
252 payload,
253 },
254 16 + payload_len,
255 ))
256 }
257}
258
259trait DocumentHandler: Send + Sync {
263 fn type_name(&self) -> &'static str;
265
266 fn decode(&self, data: &[u8]) -> Option<Box<dyn Any + Send + Sync>>;
268
269 fn merge(&self, doc: &mut dyn Any, other: &dyn Any) -> bool;
271
272 fn encode(&self, doc: &dyn Any) -> Vec<u8>;
274
275 fn identity(&self, doc: &dyn Any) -> (u32, u64);
277
278 fn to_delta_op(&self, doc: &dyn Any) -> Option<AppOperation>;
280
281 fn apply_delta_op(&self, doc: &mut dyn Any, op: &AppOperation) -> bool;
283}
284
285struct TypedHandler<T: DocumentType> {
287 _marker: core::marker::PhantomData<T>,
288}
289
290impl<T: DocumentType> Default for TypedHandler<T> {
291 fn default() -> Self {
292 Self {
293 _marker: core::marker::PhantomData,
294 }
295 }
296}
297
298impl<T: DocumentType> DocumentHandler for TypedHandler<T> {
299 fn type_name(&self) -> &'static str {
300 T::TYPE_NAME
301 }
302
303 fn decode(&self, data: &[u8]) -> Option<Box<dyn Any + Send + Sync>> {
304 T::decode(data).map(|doc| Box::new(doc) as Box<dyn Any + Send + Sync>)
305 }
306
307 fn merge(&self, doc: &mut dyn Any, other: &dyn Any) -> bool {
308 if let (Some(doc), Some(other)) = (doc.downcast_mut::<T>(), other.downcast_ref::<T>()) {
309 doc.merge(other)
310 } else {
311 false
312 }
313 }
314
315 fn encode(&self, doc: &dyn Any) -> Vec<u8> {
316 doc.downcast_ref::<T>()
317 .map(|d| d.encode())
318 .unwrap_or_default()
319 }
320
321 fn identity(&self, doc: &dyn Any) -> (u32, u64) {
322 doc.downcast_ref::<T>()
323 .map(|d| d.identity())
324 .unwrap_or((0, 0))
325 }
326
327 fn to_delta_op(&self, doc: &dyn Any) -> Option<AppOperation> {
328 doc.downcast_ref::<T>().and_then(|d| d.to_delta_op())
329 }
330
331 fn apply_delta_op(&self, doc: &mut dyn Any, op: &AppOperation) -> bool {
332 doc.downcast_mut::<T>()
333 .map(|d| d.apply_delta_op(op))
334 .unwrap_or(false)
335 }
336}
337
338pub struct DocumentRegistry {
342 handlers: RwLock<HashMap<u8, Box<dyn DocumentHandler>>>,
343}
344
345impl Default for DocumentRegistry {
346 fn default() -> Self {
347 Self::new()
348 }
349}
350
351impl DocumentRegistry {
352 pub fn new() -> Self {
354 Self {
355 handlers: RwLock::new(HashMap::new()),
356 }
357 }
358
359 pub fn register<T: DocumentType>(&self) {
367 let type_id = T::TYPE_ID;
368
369 assert!(
370 (APP_TYPE_MIN..=APP_TYPE_MAX).contains(&type_id),
371 "TYPE_ID 0x{:02X} is outside valid range 0xC0-0xCF",
372 type_id
373 );
374
375 let handlers = self.handlers.write();
376 #[cfg(feature = "std")]
377 let mut handlers = handlers.unwrap();
378 #[cfg(not(feature = "std"))]
379 let mut handlers = handlers;
380
381 assert!(
382 !handlers.contains_key(&type_id),
383 "TYPE_ID 0x{:02X} is already registered",
384 type_id
385 );
386
387 handlers.insert(type_id, Box::new(TypedHandler::<T>::default()));
388 }
389
390 pub fn try_register<T: DocumentType>(&self) -> bool {
392 let type_id = T::TYPE_ID;
393
394 if !(APP_TYPE_MIN..=APP_TYPE_MAX).contains(&type_id) {
395 return false;
396 }
397
398 let handlers = self.handlers.write();
399 #[cfg(feature = "std")]
400 let mut handlers = handlers.unwrap();
401 #[cfg(not(feature = "std"))]
402 let mut handlers = handlers;
403
404 if handlers.contains_key(&type_id) {
405 return false;
406 }
407
408 handlers.insert(type_id, Box::new(TypedHandler::<T>::default()));
409 true
410 }
411
412 pub fn is_registered(&self, type_id: u8) -> bool {
414 let handlers = self.handlers.read();
415 #[cfg(feature = "std")]
416 let handlers = handlers.unwrap();
417
418 handlers.contains_key(&type_id)
419 }
420
421 pub fn is_app_type(type_id: u8) -> bool {
423 (APP_TYPE_MIN..=APP_TYPE_MAX).contains(&type_id)
424 }
425
426 pub fn type_name(&self, type_id: u8) -> Option<&'static str> {
428 let handlers = self.handlers.read();
429 #[cfg(feature = "std")]
430 let handlers = handlers.unwrap();
431
432 handlers.get(&type_id).map(|h| h.type_name())
433 }
434
435 pub fn registered_types(&self) -> Vec<u8> {
437 let handlers = self.handlers.read();
438 #[cfg(feature = "std")]
439 let handlers = handlers.unwrap();
440
441 handlers.keys().copied().collect()
442 }
443
444 pub fn decode(&self, type_id: u8, data: &[u8]) -> Option<Box<dyn Any + Send + Sync>> {
446 let handlers = self.handlers.read();
447 #[cfg(feature = "std")]
448 let handlers = handlers.unwrap();
449
450 handlers.get(&type_id).and_then(|h| h.decode(data))
451 }
452
453 pub fn merge(&self, type_id: u8, doc: &mut dyn Any, other: &dyn Any) -> bool {
455 let handlers = self.handlers.read();
456 #[cfg(feature = "std")]
457 let handlers = handlers.unwrap();
458
459 handlers
460 .get(&type_id)
461 .map(|h| h.merge(doc, other))
462 .unwrap_or(false)
463 }
464
465 pub fn encode(&self, type_id: u8, doc: &dyn Any) -> Vec<u8> {
467 let handlers = self.handlers.read();
468 #[cfg(feature = "std")]
469 let handlers = handlers.unwrap();
470
471 handlers
472 .get(&type_id)
473 .map(|h| h.encode(doc))
474 .unwrap_or_default()
475 }
476
477 pub fn identity(&self, type_id: u8, doc: &dyn Any) -> Option<(u32, u64)> {
479 let handlers = self.handlers.read();
480 #[cfg(feature = "std")]
481 let handlers = handlers.unwrap();
482
483 handlers.get(&type_id).map(|h| h.identity(doc))
484 }
485
486 pub fn to_delta_op(&self, type_id: u8, doc: &dyn Any) -> Option<AppOperation> {
488 let handlers = self.handlers.read();
489 #[cfg(feature = "std")]
490 let handlers = handlers.unwrap();
491
492 handlers.get(&type_id).and_then(|h| h.to_delta_op(doc))
493 }
494
495 pub fn apply_delta_op(&self, type_id: u8, doc: &mut dyn Any, op: &AppOperation) -> bool {
497 let handlers = self.handlers.read();
498 #[cfg(feature = "std")]
499 let handlers = handlers.unwrap();
500
501 handlers
502 .get(&type_id)
503 .map(|h| h.apply_delta_op(doc, op))
504 .unwrap_or(false)
505 }
506}
507
508pub fn decode_typed<T: DocumentType>(data: &[u8]) -> Option<T> {
510 T::decode(data)
511}
512
513pub fn encode_with_header<T: DocumentType>(doc: &T) -> Vec<u8> {
523 let payload = doc.encode();
524 let mut buf = Vec::with_capacity(4 + payload.len());
525
526 buf.push(T::TYPE_ID);
527 buf.push(0x00); buf.extend_from_slice(&(payload.len() as u16).to_le_bytes());
529 buf.extend_from_slice(&payload);
530
531 buf
532}
533
534pub fn decode_header(data: &[u8]) -> Option<(u8, &[u8])> {
538 if data.len() < 4 {
539 return None;
540 }
541
542 let type_id = data[0];
543 if !DocumentRegistry::is_app_type(type_id) {
544 return None;
545 }
546
547 let _flags = data[1];
548 let length = u16::from_le_bytes([data[2], data[3]]) as usize;
549
550 if data.len() < 4 + length {
551 return None;
552 }
553
554 Some((type_id, &data[4..4 + length]))
555}
556
557#[cfg(test)]
558mod tests {
559 use super::*;
560
561 #[derive(Clone, Debug, PartialEq)]
562 struct TestMessage {
563 source_node: u32,
564 timestamp: u64,
565 content: String,
566 ack_count: u32,
567 }
568
569 impl DocumentType for TestMessage {
570 const TYPE_ID: u8 = 0xC0;
571 const TYPE_NAME: &'static str = "TestMessage";
572
573 fn identity(&self) -> (u32, u64) {
574 (self.source_node, self.timestamp)
575 }
576
577 fn encode(&self) -> Vec<u8> {
578 let mut buf = Vec::new();
579 buf.extend_from_slice(&self.source_node.to_le_bytes());
580 buf.extend_from_slice(&self.timestamp.to_le_bytes());
581 buf.extend_from_slice(&self.ack_count.to_le_bytes());
582 buf.push(self.content.len() as u8);
583 buf.extend_from_slice(self.content.as_bytes());
584 buf
585 }
586
587 fn decode(data: &[u8]) -> Option<Self> {
588 if data.len() < 17 {
589 return None;
590 }
591 let source_node = u32::from_le_bytes([data[0], data[1], data[2], data[3]]);
592 let timestamp = u64::from_le_bytes([
593 data[4], data[5], data[6], data[7], data[8], data[9], data[10], data[11],
594 ]);
595 let ack_count = u32::from_le_bytes([data[12], data[13], data[14], data[15]]);
596 let content_len = data[16] as usize;
597 if data.len() < 17 + content_len {
598 return None;
599 }
600 let content = String::from_utf8_lossy(&data[17..17 + content_len]).to_string();
601 Some(Self {
602 source_node,
603 timestamp,
604 content,
605 ack_count,
606 })
607 }
608
609 fn merge(&mut self, other: &Self) -> bool {
610 if self.identity() != other.identity() {
611 return false;
612 }
613 if other.ack_count > self.ack_count {
614 self.ack_count = other.ack_count;
615 return true;
616 }
617 false
618 }
619
620 fn to_delta_op(&self) -> Option<AppOperation> {
621 Some(
622 AppOperation::new(Self::TYPE_ID, 0x01, self.source_node, self.timestamp)
623 .with_payload(self.ack_count.to_le_bytes().to_vec()),
624 )
625 }
626 }
627
628 #[test]
629 fn test_registry_register() {
630 let registry = DocumentRegistry::new();
631 registry.register::<TestMessage>();
632
633 assert!(registry.is_registered(0xC0));
634 assert!(!registry.is_registered(0xC1));
635 assert_eq!(registry.type_name(0xC0), Some("TestMessage"));
636 }
637
638 #[test]
639 fn test_registry_try_register() {
640 let registry = DocumentRegistry::new();
641
642 assert!(registry.try_register::<TestMessage>());
643 assert!(!registry.try_register::<TestMessage>()); }
645
646 #[test]
647 #[should_panic(expected = "outside valid range")]
648 fn test_registry_invalid_type_id() {
649 #[derive(Clone)]
650 struct BadType;
651
652 impl DocumentType for BadType {
653 const TYPE_ID: u8 = 0xAB; const TYPE_NAME: &'static str = "BadType";
655
656 fn identity(&self) -> (u32, u64) {
657 (0, 0)
658 }
659 fn encode(&self) -> Vec<u8> {
660 vec![]
661 }
662 fn decode(_: &[u8]) -> Option<Self> {
663 None
664 }
665 fn merge(&mut self, _: &Self) -> bool {
666 false
667 }
668 }
669
670 let registry = DocumentRegistry::new();
671 registry.register::<BadType>();
672 }
673
674 #[test]
675 fn test_document_encode_decode() {
676 let msg = TestMessage {
677 source_node: 0x12345678,
678 timestamp: 1000,
679 content: "Hello".to_string(),
680 ack_count: 5,
681 };
682
683 let encoded = msg.encode();
684 let decoded = TestMessage::decode(&encoded).unwrap();
685
686 assert_eq!(decoded, msg);
687 }
688
689 #[test]
690 fn test_document_merge() {
691 let mut msg1 = TestMessage {
692 source_node: 0x12345678,
693 timestamp: 1000,
694 content: "Hello".to_string(),
695 ack_count: 5,
696 };
697
698 let msg2 = TestMessage {
699 source_node: 0x12345678,
700 timestamp: 1000,
701 content: "Hello".to_string(),
702 ack_count: 10,
703 };
704
705 assert!(msg1.merge(&msg2));
706 assert_eq!(msg1.ack_count, 10);
707
708 let msg3 = TestMessage {
710 source_node: 0x12345678,
711 timestamp: 1000,
712 content: "Hello".to_string(),
713 ack_count: 3,
714 };
715 assert!(!msg1.merge(&msg3));
716 assert_eq!(msg1.ack_count, 10);
717 }
718
719 #[test]
720 fn test_registry_decode() {
721 let registry = DocumentRegistry::new();
722 registry.register::<TestMessage>();
723
724 let msg = TestMessage {
725 source_node: 0xAABBCCDD,
726 timestamp: 2000,
727 content: "Test".to_string(),
728 ack_count: 7,
729 };
730
731 let encoded = msg.encode();
732 let decoded = registry.decode(0xC0, &encoded).unwrap();
733 let decoded_msg = decoded.downcast_ref::<TestMessage>().unwrap();
734
735 assert_eq!(decoded_msg, &msg);
736 }
737
738 #[test]
739 fn test_registry_merge() {
740 let registry = DocumentRegistry::new();
741 registry.register::<TestMessage>();
742
743 let mut msg1 = TestMessage {
744 source_node: 0x12345678,
745 timestamp: 1000,
746 content: "Hello".to_string(),
747 ack_count: 5,
748 };
749
750 let msg2 = TestMessage {
751 source_node: 0x12345678,
752 timestamp: 1000,
753 content: "Hello".to_string(),
754 ack_count: 15,
755 };
756
757 let changed = registry.merge(0xC0, &mut msg1, &msg2);
758 assert!(changed);
759 assert_eq!(msg1.ack_count, 15);
760 }
761
762 #[test]
763 fn test_app_operation_encode_decode() {
764 let op = AppOperation::new(0xC0, 0x01, 0x12345678, 1000).with_payload(vec![1, 2, 3, 4]);
765
766 let encoded = op.encode();
767 let (decoded, size) = AppOperation::decode(&encoded).unwrap();
768
769 assert_eq!(size, encoded.len());
770 assert_eq!(decoded.type_id, 0xC0);
771 assert_eq!(decoded.op_code, 0x01);
772 assert_eq!(decoded.source_node, 0x12345678);
773 assert_eq!(decoded.timestamp, 1000);
774 assert_eq!(decoded.payload, vec![1, 2, 3, 4]);
775 }
776
777 #[test]
778 fn test_encode_with_header() {
779 let msg = TestMessage {
780 source_node: 0x12345678,
781 timestamp: 1000,
782 content: "Hi".to_string(),
783 ack_count: 3,
784 };
785
786 let encoded = encode_with_header(&msg);
787
788 assert_eq!(encoded[0], 0xC0); assert_eq!(encoded[1], 0x00); let (type_id, payload) = decode_header(&encoded).unwrap();
792 assert_eq!(type_id, 0xC0);
793
794 let decoded = TestMessage::decode(payload).unwrap();
795 assert_eq!(decoded, msg);
796 }
797
798 #[test]
799 fn test_is_app_type() {
800 assert!(DocumentRegistry::is_app_type(0xC0));
801 assert!(DocumentRegistry::is_app_type(0xCF));
802 assert!(!DocumentRegistry::is_app_type(0xAB));
803 assert!(!DocumentRegistry::is_app_type(0xD0));
804 }
805
806 #[test]
807 fn test_is_app_op_type() {
808 assert!(AppOperation::is_app_op_type(0x10));
809 assert!(AppOperation::is_app_op_type(0x1F));
810 assert!(!AppOperation::is_app_op_type(0x01));
811 assert!(!AppOperation::is_app_op_type(0x20));
812 }
813}