1use crate::types::*;
14use crate::{Error, Frame, QoS, Result};
15use bytes::{Buf, BufMut, Bytes, BytesMut};
16use std::collections::HashMap;
17
18pub const ENCODING_VERSION: u8 = 1;
20
21pub mod msg {
23 pub const HELLO: u8 = 0x01;
24 pub const WELCOME: u8 = 0x02;
25 pub const ANNOUNCE: u8 = 0x03;
26 pub const SUBSCRIBE: u8 = 0x10;
27 pub const UNSUBSCRIBE: u8 = 0x11;
28 pub const PUBLISH: u8 = 0x20;
29 pub const SET: u8 = 0x21;
30 pub const GET: u8 = 0x22;
31 pub const SNAPSHOT: u8 = 0x23;
32 pub const FEDERATION_SYNC: u8 = 0x04;
33 pub const REPLAY: u8 = 0x24;
34 pub const BUNDLE: u8 = 0x30;
35 pub const SYNC: u8 = 0x40;
36 pub const PING: u8 = 0x41;
37 pub const PONG: u8 = 0x42;
38 pub const ACK: u8 = 0x50;
39 pub const ERROR: u8 = 0x51;
40 pub const QUERY: u8 = 0x60;
41 pub const RESULT: u8 = 0x61;
42}
43
44pub mod val {
46 pub const NULL: u8 = 0x00;
47 pub const BOOL: u8 = 0x01;
48 pub const I8: u8 = 0x02;
49 pub const I16: u8 = 0x03;
50 pub const I32: u8 = 0x04;
51 pub const I64: u8 = 0x05;
52 pub const F32: u8 = 0x06;
53 pub const F64: u8 = 0x07;
54 pub const STRING: u8 = 0x08;
55 pub const BYTES: u8 = 0x09;
56 pub const ARRAY: u8 = 0x0A;
57 pub const MAP: u8 = 0x0B;
58}
59
60pub mod sig {
62 pub const PARAM: u8 = 0;
63 pub const EVENT: u8 = 1;
64 pub const STREAM: u8 = 2;
65 pub const GESTURE: u8 = 3;
66 pub const TIMELINE: u8 = 4;
67}
68
69pub mod phase {
71 pub const START: u8 = 0;
72 pub const MOVE: u8 = 1;
73 pub const END: u8 = 2;
74 pub const CANCEL: u8 = 3;
75}
76
77#[inline]
83pub fn encode_message(message: &Message) -> Result<Bytes> {
84 let capacity = estimate_message_size(message);
86 let mut buf = BytesMut::with_capacity(capacity);
87 encode_message_to_buf(&mut buf, message)?;
88 Ok(buf.freeze())
89}
90
91#[inline]
93fn estimate_message_size(msg: &Message) -> usize {
94 match msg {
95 Message::Set(m) => 2 + 2 + m.address.len() + 9 + if m.revision.is_some() { 8 } else { 0 },
96 Message::Publish(m) => 2 + 2 + m.address.len() + 16,
97 Message::Hello(m) => 4 + m.name.len() + 2,
98 Message::Welcome(m) => 12 + m.name.len() + m.session.len() + 4,
99 Message::Subscribe(m) => 6 + m.pattern.len() + 16,
100 Message::Bundle(m) => 12 + m.messages.len() * 48,
101 Message::Replay(m) => 4 + m.pattern.len() + 20,
102 Message::FederationSync(m) => {
103 4 + m.patterns.iter().map(|p| 2 + p.len()).sum::<usize>() + m.revisions.len() * 12 + 16
104 }
105 Message::Ping | Message::Pong => 5, _ => 64, }
108}
109
110#[inline]
112pub fn decode_message(bytes: &[u8]) -> Result<Message> {
113 if bytes.is_empty() {
114 return Err(Error::BufferTooSmall { needed: 1, have: 0 });
115 }
116
117 let first = bytes[0];
118
119 if is_msgpack_map(first) {
122 decode_v2_msgpack(bytes)
124 } else {
125 decode_v3_binary(bytes)
127 }
128}
129
130#[inline]
132pub fn encode(message: &Message) -> Result<Bytes> {
133 let payload = encode_message(message)?;
134 let mut frame = Frame::new(payload).with_qos(message.default_qos());
135 frame.flags.version = 1; frame.encode()
137}
138
139pub fn encode_with_options(
141 message: &Message,
142 qos: Option<QoS>,
143 timestamp: Option<u64>,
144) -> Result<Bytes> {
145 let payload = encode_message(message)?;
146 let mut frame = Frame::new(payload);
147 frame.flags.version = 1; if let Some(qos) = qos {
150 frame = frame.with_qos(qos);
151 } else {
152 frame = frame.with_qos(message.default_qos());
153 }
154
155 if let Some(ts) = timestamp {
156 frame = frame.with_timestamp(ts);
157 }
158
159 frame.encode()
160}
161
162#[inline]
164pub fn decode(bytes: &[u8]) -> Result<(Message, Frame)> {
165 let frame = Frame::decode(bytes)?;
166 let message = decode_message(&frame.payload)?;
167 Ok((message, frame))
168}
169
170pub fn encode_payload(message: &Message) -> Result<Vec<u8>> {
172 let bytes = encode_message(message)?;
173 Ok(bytes.to_vec())
174}
175
176pub fn decode_payload(bytes: &[u8]) -> Result<Message> {
178 decode_message(bytes)
179}
180
181fn encode_message_to_buf(buf: &mut BytesMut, msg: &Message) -> Result<()> {
186 match msg {
187 Message::Hello(m) => encode_hello(buf, m),
188 Message::Welcome(m) => encode_welcome(buf, m),
189 Message::Announce(m) => encode_announce(buf, m),
190 Message::Subscribe(m) => encode_subscribe(buf, m),
191 Message::Unsubscribe(m) => encode_unsubscribe(buf, m),
192 Message::Publish(m) => encode_publish(buf, m),
193 Message::Set(m) => encode_set(buf, m),
194 Message::Get(m) => encode_get(buf, m),
195 Message::Snapshot(m) => encode_snapshot(buf, m),
196 Message::Replay(m) => encode_replay(buf, m),
197 Message::FederationSync(m) => encode_federation_sync(buf, m),
198 Message::Bundle(m) => encode_bundle(buf, m),
199 Message::Sync(m) => encode_sync(buf, m),
200 Message::Ping => {
201 buf.put_u8(msg::PING);
202 Ok(())
203 }
204 Message::Pong => {
205 buf.put_u8(msg::PONG);
206 Ok(())
207 }
208 Message::Ack(m) => encode_ack(buf, m),
209 Message::Error(m) => encode_error(buf, m),
210 Message::Query(m) => encode_query(buf, m),
211 Message::Result(m) => encode_result(buf, m),
212 }
213}
214
215#[inline]
218fn encode_set(buf: &mut BytesMut, msg: &SetMessage) -> Result<()> {
219 buf.put_u8(msg::SET);
220
221 let vtype = value_type_code(&msg.value);
222 let mut flags = vtype & 0x0F;
223 if msg.revision.is_some() {
224 flags |= 0x80;
225 }
226 if msg.lock {
227 flags |= 0x40;
228 }
229 if msg.unlock {
230 flags |= 0x20;
231 }
232 buf.put_u8(flags);
233
234 encode_string(buf, &msg.address)?;
236
237 encode_value_data(buf, &msg.value)?;
239
240 if let Some(rev) = msg.revision {
242 buf.put_u64(rev);
243 }
244
245 Ok(())
246}
247
248fn encode_publish(buf: &mut BytesMut, msg: &PublishMessage) -> Result<()> {
251 buf.put_u8(msg::PUBLISH);
252
253 let sig_code = msg.signal.map(signal_type_code).unwrap_or(sig::EVENT);
254 let phase_code = msg.phase.map(gesture_phase_code).unwrap_or(phase::START);
255
256 let mut flags: u8 = (sig_code & 0x07) << 5;
257 if msg.timestamp.is_some() {
258 flags |= 0x10;
259 }
260 if msg.id.is_some() {
261 flags |= 0x08;
262 }
263 flags |= phase_code & 0x07;
264 buf.put_u8(flags);
265
266 encode_string(buf, &msg.address)?;
268
269 if let Some(ref value) = msg.value {
271 buf.put_u8(1); buf.put_u8(value_type_code(value));
273 encode_value_data(buf, value)?;
274 } else if let Some(ref payload) = msg.payload {
275 buf.put_u8(1); buf.put_u8(value_type_code(payload));
277 encode_value_data(buf, payload)?;
278 } else if let Some(ref samples) = msg.samples {
279 buf.put_u8(2); buf.put_u16(samples.len() as u16);
281 for sample in samples {
282 buf.put_f64(*sample);
283 }
284 } else {
285 buf.put_u8(0); }
287
288 if let Some(ts) = msg.timestamp {
290 buf.put_u64(ts);
291 }
292
293 if let Some(id) = msg.id {
295 buf.put_u32(id);
296 }
297
298 if let Some(rate) = msg.rate {
300 buf.put_u32(rate);
301 }
302
303 Ok(())
304}
305
306fn encode_hello(buf: &mut BytesMut, msg: &HelloMessage) -> Result<()> {
308 buf.put_u8(msg::HELLO);
309 buf.put_u8(msg.version);
310
311 let mut features: u8 = 0;
313 for f in &msg.features {
314 match f.as_str() {
315 "param" => features |= 0x80,
316 "event" => features |= 0x40,
317 "stream" => features |= 0x20,
318 "gesture" => features |= 0x10,
319 "timeline" => features |= 0x08,
320 "federation" => features |= 0x04,
321 _ => {}
322 }
323 }
324 buf.put_u8(features);
325
326 encode_string(buf, &msg.name)?;
328
329 if let Some(ref token) = msg.token {
331 encode_string(buf, token)?;
332 } else {
333 buf.put_u16(0);
334 }
335
336 Ok(())
337}
338
339fn encode_welcome(buf: &mut BytesMut, msg: &WelcomeMessage) -> Result<()> {
341 buf.put_u8(msg::WELCOME);
342 buf.put_u8(msg.version);
343
344 let mut features: u8 = 0;
346 for f in &msg.features {
347 match f.as_str() {
348 "param" => features |= 0x80,
349 "event" => features |= 0x40,
350 "stream" => features |= 0x20,
351 "gesture" => features |= 0x10,
352 "timeline" => features |= 0x08,
353 _ => {}
354 }
355 }
356 buf.put_u8(features);
357
358 buf.put_u64(msg.time);
360
361 encode_string(buf, &msg.session)?;
363
364 encode_string(buf, &msg.name)?;
366
367 if let Some(ref token) = msg.token {
369 encode_string(buf, token)?;
370 } else {
371 buf.put_u16(0);
372 }
373
374 Ok(())
375}
376
377fn encode_announce(buf: &mut BytesMut, msg: &AnnounceMessage) -> Result<()> {
379 buf.put_u8(msg::ANNOUNCE);
380
381 encode_string(buf, &msg.namespace)?;
382 buf.put_u16(msg.signals.len() as u16);
383
384 for sig in &msg.signals {
385 encode_string(buf, &sig.address)?;
386 buf.put_u8(signal_type_code(sig.signal_type));
387
388 let mut opt_flags: u8 = 0;
390 if sig.datatype.is_some() {
391 opt_flags |= 0x01;
392 }
393 if sig.access.is_some() {
394 opt_flags |= 0x02;
395 }
396 if sig.meta.is_some() {
397 opt_flags |= 0x04;
398 }
399 buf.put_u8(opt_flags);
400
401 if let Some(ref dt) = sig.datatype {
402 encode_string(buf, dt)?;
403 }
404 if let Some(ref access) = sig.access {
405 encode_string(buf, access)?;
406 }
407 if let Some(ref meta) = sig.meta {
408 let mut meta_flags: u8 = 0;
410 if meta.unit.is_some() {
411 meta_flags |= 0x01;
412 }
413 if meta.range.is_some() {
414 meta_flags |= 0x02;
415 }
416 if meta.default.is_some() {
417 meta_flags |= 0x04;
418 }
419 if meta.description.is_some() {
420 meta_flags |= 0x08;
421 }
422 buf.put_u8(meta_flags);
423
424 if let Some(ref unit) = meta.unit {
425 encode_string(buf, unit)?;
426 }
427 if let Some((min, max)) = meta.range {
428 buf.put_f64(min);
429 buf.put_f64(max);
430 }
431 if let Some(ref default) = meta.default {
432 buf.put_u8(value_type_code(default));
433 encode_value_data(buf, default)?;
434 }
435 if let Some(ref desc) = meta.description {
436 encode_string(buf, desc)?;
437 }
438 }
439 }
440
441 Ok(())
442}
443
444fn encode_subscribe(buf: &mut BytesMut, msg: &SubscribeMessage) -> Result<()> {
446 buf.put_u8(msg::SUBSCRIBE);
447 buf.put_u32(msg.id);
448
449 encode_string(buf, &msg.pattern)?;
450
451 let mut type_mask: u8 = 0;
453 if msg.types.is_empty() {
454 type_mask = 0xFF; } else {
456 for t in &msg.types {
457 match t {
458 SignalType::Param => type_mask |= 0x01,
459 SignalType::Event => type_mask |= 0x02,
460 SignalType::Stream => type_mask |= 0x04,
461 SignalType::Gesture => type_mask |= 0x08,
462 SignalType::Timeline => type_mask |= 0x10,
463 }
464 }
465 }
466 buf.put_u8(type_mask);
467
468 if let Some(ref opts) = msg.options {
470 let mut opt_flags: u8 = 0;
471 if opts.max_rate.is_some() {
472 opt_flags |= 0x01;
473 }
474 if opts.epsilon.is_some() {
475 opt_flags |= 0x02;
476 }
477 if opts.history.is_some() {
478 opt_flags |= 0x04;
479 }
480 if opts.window.is_some() {
481 opt_flags |= 0x08;
482 }
483 buf.put_u8(opt_flags);
484
485 if let Some(rate) = opts.max_rate {
486 buf.put_u32(rate);
487 }
488 if let Some(eps) = opts.epsilon {
489 buf.put_f64(eps);
490 }
491 if let Some(hist) = opts.history {
492 buf.put_u32(hist);
493 }
494 if let Some(win) = opts.window {
495 buf.put_u32(win);
496 }
497 } else {
498 buf.put_u8(0); }
500
501 Ok(())
502}
503
504fn encode_unsubscribe(buf: &mut BytesMut, msg: &UnsubscribeMessage) -> Result<()> {
506 buf.put_u8(msg::UNSUBSCRIBE);
507 buf.put_u32(msg.id);
508 Ok(())
509}
510
511fn encode_get(buf: &mut BytesMut, msg: &GetMessage) -> Result<()> {
513 buf.put_u8(msg::GET);
514 encode_string(buf, &msg.address)?;
515 Ok(())
516}
517
518fn encode_snapshot(buf: &mut BytesMut, msg: &SnapshotMessage) -> Result<()> {
520 buf.put_u8(msg::SNAPSHOT);
521 buf.put_u16(msg.params.len() as u16);
522
523 for param in &msg.params {
524 encode_string(buf, ¶m.address)?;
525 buf.put_u8(value_type_code(¶m.value));
526 encode_value_data(buf, ¶m.value)?;
527 buf.put_u64(param.revision);
528
529 let mut opt_flags: u8 = 0;
530 if param.writer.is_some() {
531 opt_flags |= 0x01;
532 }
533 if param.timestamp.is_some() {
534 opt_flags |= 0x02;
535 }
536 buf.put_u8(opt_flags);
537
538 if let Some(ref writer) = param.writer {
539 encode_string(buf, writer)?;
540 }
541 if let Some(ts) = param.timestamp {
542 buf.put_u64(ts);
543 }
544 }
545
546 Ok(())
547}
548
549fn encode_replay(buf: &mut BytesMut, msg: &ReplayMessage) -> Result<()> {
552 buf.put_u8(msg::REPLAY);
553
554 let mut flags: u8 = 0;
555 if msg.from.is_some() {
556 flags |= 0x80;
557 }
558 if msg.to.is_some() {
559 flags |= 0x40;
560 }
561 if msg.limit.is_some() {
562 flags |= 0x20;
563 }
564 buf.put_u8(flags);
565
566 encode_string(buf, &msg.pattern)?;
567
568 if let Some(from) = msg.from {
569 buf.put_u64(from);
570 }
571 if let Some(to) = msg.to {
572 buf.put_u64(to);
573 }
574 if let Some(limit) = msg.limit {
575 buf.put_u32(limit);
576 }
577
578 let mut type_mask: u8 = 0;
580 if msg.types.is_empty() {
581 type_mask = 0xFF; } else {
583 for t in &msg.types {
584 match t {
585 SignalType::Param => type_mask |= 0x01,
586 SignalType::Event => type_mask |= 0x02,
587 SignalType::Stream => type_mask |= 0x04,
588 SignalType::Gesture => type_mask |= 0x08,
589 SignalType::Timeline => type_mask |= 0x10,
590 }
591 }
592 }
593 buf.put_u8(type_mask);
594
595 Ok(())
596}
597
598fn encode_federation_sync(buf: &mut BytesMut, msg: &FederationSyncMessage) -> Result<()> {
601 buf.put_u8(msg::FEDERATION_SYNC);
602 buf.put_u8(msg.op as u8);
603
604 buf.put_u16(msg.patterns.len() as u16);
606 for pattern in &msg.patterns {
607 encode_string(buf, pattern)?;
608 }
609
610 buf.put_u16(msg.revisions.len() as u16);
612 for (key, val) in &msg.revisions {
613 encode_string(buf, key)?;
614 buf.put_u64(*val);
615 }
616
617 let mut flags: u8 = 0;
619 if msg.since_revision.is_some() {
620 flags |= 0x80;
621 }
622 if msg.origin.is_some() {
623 flags |= 0x40;
624 }
625 buf.put_u8(flags);
626
627 if let Some(since) = msg.since_revision {
628 buf.put_u64(since);
629 }
630 if let Some(ref origin) = msg.origin {
631 encode_string(buf, origin)?;
632 }
633
634 Ok(())
635}
636
637fn encode_bundle(buf: &mut BytesMut, msg: &BundleMessage) -> Result<()> {
639 buf.put_u8(msg::BUNDLE);
640
641 let mut flags: u8 = 0;
642 if msg.timestamp.is_some() {
643 flags |= 0x80;
644 }
645 buf.put_u8(flags);
646
647 buf.put_u16(msg.messages.len() as u16);
648
649 if let Some(ts) = msg.timestamp {
650 buf.put_u64(ts);
651 }
652
653 for inner_msg in &msg.messages {
655 let mut inner_buf = BytesMut::with_capacity(64);
656 encode_message_to_buf(&mut inner_buf, inner_msg)?;
657 buf.put_u16(inner_buf.len() as u16);
658 buf.extend_from_slice(&inner_buf);
659 }
660
661 Ok(())
662}
663
664fn encode_sync(buf: &mut BytesMut, msg: &SyncMessage) -> Result<()> {
666 buf.put_u8(msg::SYNC);
667
668 let mut flags: u8 = 0;
669 if msg.t2.is_some() {
670 flags |= 0x01;
671 }
672 if msg.t3.is_some() {
673 flags |= 0x02;
674 }
675 buf.put_u8(flags);
676
677 buf.put_u64(msg.t1);
678 if let Some(t2) = msg.t2 {
679 buf.put_u64(t2);
680 }
681 if let Some(t3) = msg.t3 {
682 buf.put_u64(t3);
683 }
684
685 Ok(())
686}
687
688fn encode_ack(buf: &mut BytesMut, msg: &AckMessage) -> Result<()> {
690 buf.put_u8(msg::ACK);
691
692 let mut flags: u8 = 0;
693 if msg.address.is_some() {
694 flags |= 0x01;
695 }
696 if msg.revision.is_some() {
697 flags |= 0x02;
698 }
699 if msg.locked.is_some() {
700 flags |= 0x04;
701 }
702 if msg.holder.is_some() {
703 flags |= 0x08;
704 }
705 if msg.correlation_id.is_some() {
706 flags |= 0x10;
707 }
708 buf.put_u8(flags);
709
710 if let Some(ref addr) = msg.address {
711 encode_string(buf, addr)?;
712 }
713 if let Some(rev) = msg.revision {
714 buf.put_u64(rev);
715 }
716 if let Some(locked) = msg.locked {
717 buf.put_u8(if locked { 1 } else { 0 });
718 }
719 if let Some(ref holder) = msg.holder {
720 encode_string(buf, holder)?;
721 }
722 if let Some(corr) = msg.correlation_id {
723 buf.put_u32(corr);
724 }
725
726 Ok(())
727}
728
729fn encode_error(buf: &mut BytesMut, msg: &ErrorMessage) -> Result<()> {
731 buf.put_u8(msg::ERROR);
732 buf.put_u16(msg.code);
733 encode_string(buf, &msg.message)?;
734
735 let mut flags: u8 = 0;
736 if msg.address.is_some() {
737 flags |= 0x01;
738 }
739 if msg.correlation_id.is_some() {
740 flags |= 0x02;
741 }
742 buf.put_u8(flags);
743
744 if let Some(ref addr) = msg.address {
745 encode_string(buf, addr)?;
746 }
747 if let Some(corr) = msg.correlation_id {
748 buf.put_u32(corr);
749 }
750
751 Ok(())
752}
753
754fn encode_query(buf: &mut BytesMut, msg: &QueryMessage) -> Result<()> {
756 buf.put_u8(msg::QUERY);
757 encode_string(buf, &msg.pattern)?;
758 Ok(())
759}
760
761fn encode_result(buf: &mut BytesMut, msg: &ResultMessage) -> Result<()> {
763 buf.put_u8(msg::RESULT);
764 buf.put_u16(msg.signals.len() as u16);
765
766 for sig in &msg.signals {
767 encode_string(buf, &sig.address)?;
768 buf.put_u8(signal_type_code(sig.signal_type));
769
770 let mut opt_flags: u8 = 0;
771 if sig.datatype.is_some() {
772 opt_flags |= 0x01;
773 }
774 if sig.access.is_some() {
775 opt_flags |= 0x02;
776 }
777 buf.put_u8(opt_flags);
778
779 if let Some(ref dt) = sig.datatype {
780 encode_string(buf, dt)?;
781 }
782 if let Some(ref access) = sig.access {
783 encode_string(buf, access)?;
784 }
785 }
786
787 Ok(())
788}
789
790#[inline(always)]
795fn encode_string(buf: &mut BytesMut, s: &str) -> Result<()> {
796 let bytes = s.as_bytes();
797 if bytes.len() > u16::MAX as usize {
798 return Err(Error::PayloadTooLarge(bytes.len()));
799 }
800 buf.put_u16(bytes.len() as u16);
801 buf.extend_from_slice(bytes);
802 Ok(())
803}
804
805#[inline]
806fn encode_value_data(buf: &mut BytesMut, value: &Value) -> Result<()> {
807 match value {
808 Value::Null => {} Value::Bool(b) => buf.put_u8(if *b { 1 } else { 0 }),
810 Value::Int(i) => buf.put_i64(*i),
811 Value::Float(f) => buf.put_f64(*f),
812 Value::String(s) => encode_string(buf, s)?,
813 Value::Bytes(b) => {
814 buf.put_u16(b.len() as u16);
815 buf.extend_from_slice(b);
816 }
817 Value::Array(arr) => {
818 buf.put_u16(arr.len() as u16);
819 for item in arr {
820 buf.put_u8(value_type_code(item));
821 encode_value_data(buf, item)?;
822 }
823 }
824 Value::Map(map) => {
825 buf.put_u16(map.len() as u16);
826 for (key, val) in map {
827 encode_string(buf, key)?;
828 buf.put_u8(value_type_code(val));
829 encode_value_data(buf, val)?;
830 }
831 }
832 }
833 Ok(())
834}
835
836#[inline(always)]
837fn value_type_code(value: &Value) -> u8 {
838 match value {
839 Value::Null => val::NULL,
840 Value::Bool(_) => val::BOOL,
841 Value::Int(_) => val::I64,
842 Value::Float(_) => val::F64,
843 Value::String(_) => val::STRING,
844 Value::Bytes(_) => val::BYTES,
845 Value::Array(_) => val::ARRAY,
846 Value::Map(_) => val::MAP,
847 }
848}
849
850fn signal_type_code(sig: SignalType) -> u8 {
851 match sig {
852 SignalType::Param => sig::PARAM,
853 SignalType::Event => sig::EVENT,
854 SignalType::Stream => sig::STREAM,
855 SignalType::Gesture => sig::GESTURE,
856 SignalType::Timeline => sig::TIMELINE,
857 }
858}
859
860fn gesture_phase_code(phase: GesturePhase) -> u8 {
861 match phase {
862 GesturePhase::Start => phase::START,
863 GesturePhase::Move => phase::MOVE,
864 GesturePhase::End => phase::END,
865 GesturePhase::Cancel => phase::CANCEL,
866 }
867}
868
869fn decode_v3_binary(bytes: &[u8]) -> Result<Message> {
874 if bytes.is_empty() {
875 return Err(Error::BufferTooSmall { needed: 1, have: 0 });
876 }
877
878 let mut buf = bytes;
879 let msg_type = buf.get_u8();
880
881 match msg_type {
882 msg::HELLO => decode_hello(&mut buf),
883 msg::WELCOME => decode_welcome(&mut buf),
884 msg::ANNOUNCE => decode_announce(&mut buf),
885 msg::SUBSCRIBE => decode_subscribe(&mut buf),
886 msg::UNSUBSCRIBE => decode_unsubscribe(&mut buf),
887 msg::PUBLISH => decode_publish(&mut buf),
888 msg::SET => decode_set(&mut buf),
889 msg::GET => decode_get(&mut buf),
890 msg::SNAPSHOT => decode_snapshot(&mut buf),
891 msg::REPLAY => decode_replay(&mut buf),
892 msg::FEDERATION_SYNC => decode_federation_sync(&mut buf),
893 msg::BUNDLE => decode_bundle(&mut buf),
894 msg::SYNC => decode_sync(&mut buf),
895 msg::PING => Ok(Message::Ping),
896 msg::PONG => Ok(Message::Pong),
897 msg::ACK => decode_ack(&mut buf),
898 msg::ERROR => decode_error(&mut buf),
899 msg::QUERY => decode_query(&mut buf),
900 msg::RESULT => decode_result(&mut buf),
901 _ => Err(Error::UnknownMessageType(msg_type)),
902 }
903}
904
905#[inline]
906fn decode_set(buf: &mut &[u8]) -> Result<Message> {
907 let flags = buf.get_u8();
908 let vtype = flags & 0x0F;
909 let has_rev = (flags & 0x80) != 0;
910 let lock = (flags & 0x40) != 0;
911 let unlock = (flags & 0x20) != 0;
912
913 let address = decode_string(buf)?;
914 let value = decode_value_data(buf, vtype)?;
915
916 let revision = if has_rev { Some(buf.get_u64()) } else { None };
917
918 Ok(Message::Set(SetMessage {
919 address,
920 value,
921 revision,
922 lock,
923 unlock,
924 }))
925}
926
927fn decode_publish(buf: &mut &[u8]) -> Result<Message> {
928 let flags = buf.get_u8();
929 let sig_code = (flags >> 5) & 0x07;
930 let has_ts = (flags & 0x10) != 0;
931 let has_id = (flags & 0x08) != 0;
932 let phase_code = flags & 0x07;
933
934 let address = decode_string(buf)?;
935
936 let value_indicator = buf.get_u8();
938 let (value, payload, samples) = match value_indicator {
939 0 => (None, None, None),
940 1 => {
941 let vtype = buf.get_u8();
942 let v = decode_value_data(buf, vtype)?;
943 (Some(v), None, None)
944 }
945 2 => {
946 let count = buf.get_u16() as usize;
947 let mut s = Vec::with_capacity(count);
948 for _ in 0..count {
949 s.push(buf.get_f64());
950 }
951 (None, None, Some(s))
952 }
953 _ => (None, None, None),
954 };
955
956 let timestamp = if has_ts { Some(buf.get_u64()) } else { None };
957 let id = if has_id { Some(buf.get_u32()) } else { None };
958
959 let rate = if buf.remaining() >= 4 {
961 Some(buf.get_u32())
962 } else {
963 None
964 };
965
966 let signal = Some(signal_type_from_code(sig_code));
967 let phase = Some(gesture_phase_from_code(phase_code));
968
969 Ok(Message::Publish(PublishMessage {
970 address,
971 signal,
972 value,
973 payload,
974 samples,
975 rate,
976 id,
977 phase,
978 timestamp,
979 timeline: None, }))
981}
982
983fn decode_hello(buf: &mut &[u8]) -> Result<Message> {
984 let version = buf.get_u8();
985 let feature_flags = buf.get_u8();
986
987 let mut features = Vec::new();
988 if feature_flags & 0x80 != 0 {
989 features.push("param".to_string());
990 }
991 if feature_flags & 0x40 != 0 {
992 features.push("event".to_string());
993 }
994 if feature_flags & 0x20 != 0 {
995 features.push("stream".to_string());
996 }
997 if feature_flags & 0x10 != 0 {
998 features.push("gesture".to_string());
999 }
1000 if feature_flags & 0x08 != 0 {
1001 features.push("timeline".to_string());
1002 }
1003 if feature_flags & 0x04 != 0 {
1004 features.push("federation".to_string());
1005 }
1006
1007 let name = decode_string(buf)?;
1008 let token_str = decode_string(buf)?;
1009 let token = if token_str.is_empty() {
1010 None
1011 } else {
1012 Some(token_str)
1013 };
1014
1015 Ok(Message::Hello(HelloMessage {
1016 version,
1017 name,
1018 features,
1019 capabilities: None,
1020 token,
1021 }))
1022}
1023
1024fn decode_welcome(buf: &mut &[u8]) -> Result<Message> {
1025 let version = buf.get_u8();
1026 let feature_flags = buf.get_u8();
1027
1028 let mut features = Vec::new();
1029 if feature_flags & 0x80 != 0 {
1030 features.push("param".to_string());
1031 }
1032 if feature_flags & 0x40 != 0 {
1033 features.push("event".to_string());
1034 }
1035 if feature_flags & 0x20 != 0 {
1036 features.push("stream".to_string());
1037 }
1038 if feature_flags & 0x10 != 0 {
1039 features.push("gesture".to_string());
1040 }
1041 if feature_flags & 0x08 != 0 {
1042 features.push("timeline".to_string());
1043 }
1044
1045 let time = buf.get_u64();
1046 let session = decode_string(buf)?;
1047 let name = decode_string(buf)?;
1048
1049 let token_str = decode_string(buf)?;
1050 let token = if token_str.is_empty() {
1051 None
1052 } else {
1053 Some(token_str)
1054 };
1055
1056 Ok(Message::Welcome(WelcomeMessage {
1057 version,
1058 session,
1059 name,
1060 features,
1061 time,
1062 token,
1063 }))
1064}
1065
1066fn decode_announce(buf: &mut &[u8]) -> Result<Message> {
1067 let namespace = decode_string(buf)?;
1068 let count = buf.get_u16() as usize;
1069
1070 let mut signals = Vec::with_capacity(count);
1071 for _ in 0..count {
1072 let address = decode_string(buf)?;
1073 let sig_code = buf.get_u8();
1074 let opt_flags = buf.get_u8();
1075
1076 let datatype = if opt_flags & 0x01 != 0 {
1077 Some(decode_string(buf)?)
1078 } else {
1079 None
1080 };
1081 let access = if opt_flags & 0x02 != 0 {
1082 Some(decode_string(buf)?)
1083 } else {
1084 None
1085 };
1086
1087 let meta = if opt_flags & 0x04 != 0 {
1088 let meta_flags = buf.get_u8();
1089
1090 let unit = if meta_flags & 0x01 != 0 {
1091 Some(decode_string(buf)?)
1092 } else {
1093 None
1094 };
1095 let range = if meta_flags & 0x02 != 0 {
1096 let min = buf.get_f64();
1097 let max = buf.get_f64();
1098 Some((min, max))
1099 } else {
1100 None
1101 };
1102 let default = if meta_flags & 0x04 != 0 {
1103 let vtype = buf.get_u8();
1104 Some(decode_value_data(buf, vtype)?)
1105 } else {
1106 None
1107 };
1108 let description = if meta_flags & 0x08 != 0 {
1109 Some(decode_string(buf)?)
1110 } else {
1111 None
1112 };
1113
1114 Some(SignalMeta {
1115 unit,
1116 range,
1117 default,
1118 description,
1119 })
1120 } else {
1121 None
1122 };
1123
1124 signals.push(SignalDefinition {
1125 address,
1126 signal_type: signal_type_from_code(sig_code),
1127 datatype,
1128 access,
1129 meta,
1130 });
1131 }
1132
1133 Ok(Message::Announce(AnnounceMessage {
1134 namespace,
1135 signals,
1136 meta: None,
1137 }))
1138}
1139
1140fn decode_subscribe(buf: &mut &[u8]) -> Result<Message> {
1141 let id = buf.get_u32();
1142 let pattern = decode_string(buf)?;
1143 let type_mask = buf.get_u8();
1144
1145 let mut types = Vec::new();
1146 if type_mask == 0xFF {
1147 } else {
1149 if type_mask & 0x01 != 0 {
1150 types.push(SignalType::Param);
1151 }
1152 if type_mask & 0x02 != 0 {
1153 types.push(SignalType::Event);
1154 }
1155 if type_mask & 0x04 != 0 {
1156 types.push(SignalType::Stream);
1157 }
1158 if type_mask & 0x08 != 0 {
1159 types.push(SignalType::Gesture);
1160 }
1161 if type_mask & 0x10 != 0 {
1162 types.push(SignalType::Timeline);
1163 }
1164 }
1165
1166 let opt_flags = buf.get_u8();
1167 let options = if opt_flags != 0 {
1168 let max_rate = if opt_flags & 0x01 != 0 {
1169 Some(buf.get_u32())
1170 } else {
1171 None
1172 };
1173 let epsilon = if opt_flags & 0x02 != 0 {
1174 Some(buf.get_f64())
1175 } else {
1176 None
1177 };
1178 let history = if opt_flags & 0x04 != 0 {
1179 Some(buf.get_u32())
1180 } else {
1181 None
1182 };
1183 let window = if opt_flags & 0x08 != 0 {
1184 Some(buf.get_u32())
1185 } else {
1186 None
1187 };
1188
1189 Some(SubscribeOptions {
1190 max_rate,
1191 epsilon,
1192 history,
1193 window,
1194 })
1195 } else {
1196 None
1197 };
1198
1199 Ok(Message::Subscribe(SubscribeMessage {
1200 id,
1201 pattern,
1202 types,
1203 options,
1204 }))
1205}
1206
1207fn decode_unsubscribe(buf: &mut &[u8]) -> Result<Message> {
1208 let id = buf.get_u32();
1209 Ok(Message::Unsubscribe(UnsubscribeMessage { id }))
1210}
1211
1212fn decode_get(buf: &mut &[u8]) -> Result<Message> {
1213 let address = decode_string(buf)?;
1214 Ok(Message::Get(GetMessage { address }))
1215}
1216
1217fn decode_snapshot(buf: &mut &[u8]) -> Result<Message> {
1218 let count = buf.get_u16() as usize;
1219 let mut params = Vec::with_capacity(count);
1220
1221 for _ in 0..count {
1222 let address = decode_string(buf)?;
1223 let vtype = buf.get_u8();
1224 let value = decode_value_data(buf, vtype)?;
1225 let revision = buf.get_u64();
1226 let opt_flags = buf.get_u8();
1227
1228 let writer = if opt_flags & 0x01 != 0 {
1229 Some(decode_string(buf)?)
1230 } else {
1231 None
1232 };
1233 let timestamp = if opt_flags & 0x02 != 0 {
1234 Some(buf.get_u64())
1235 } else {
1236 None
1237 };
1238
1239 params.push(ParamValue {
1240 address,
1241 value,
1242 revision,
1243 writer,
1244 timestamp,
1245 });
1246 }
1247
1248 Ok(Message::Snapshot(SnapshotMessage { params }))
1249}
1250
1251fn decode_replay(buf: &mut &[u8]) -> Result<Message> {
1252 let flags = buf.get_u8();
1253 let has_from = (flags & 0x80) != 0;
1254 let has_to = (flags & 0x40) != 0;
1255 let has_limit = (flags & 0x20) != 0;
1256
1257 let pattern = decode_string(buf)?;
1258
1259 let from = if has_from { Some(buf.get_u64()) } else { None };
1260 let to = if has_to { Some(buf.get_u64()) } else { None };
1261 let limit = if has_limit { Some(buf.get_u32()) } else { None };
1262
1263 let type_mask = buf.get_u8();
1264 let mut types = Vec::new();
1265 if type_mask != 0xFF {
1266 if type_mask & 0x01 != 0 {
1267 types.push(SignalType::Param);
1268 }
1269 if type_mask & 0x02 != 0 {
1270 types.push(SignalType::Event);
1271 }
1272 if type_mask & 0x04 != 0 {
1273 types.push(SignalType::Stream);
1274 }
1275 if type_mask & 0x08 != 0 {
1276 types.push(SignalType::Gesture);
1277 }
1278 if type_mask & 0x10 != 0 {
1279 types.push(SignalType::Timeline);
1280 }
1281 }
1282
1283 Ok(Message::Replay(ReplayMessage {
1284 pattern,
1285 from,
1286 to,
1287 limit,
1288 types,
1289 }))
1290}
1291
1292fn decode_federation_sync(buf: &mut &[u8]) -> Result<Message> {
1293 let op_byte = buf.get_u8();
1294 let op = match op_byte {
1295 0x01 => FederationOp::DeclareNamespaces,
1296 0x02 => FederationOp::RequestSync,
1297 0x03 => FederationOp::RevisionVector,
1298 0x04 => FederationOp::SyncComplete,
1299 _ => return Err(Error::DecodeError("unknown federation op".into())),
1300 };
1301
1302 let pattern_count = buf.get_u16() as usize;
1304 let mut patterns = Vec::with_capacity(pattern_count);
1305 for _ in 0..pattern_count {
1306 patterns.push(decode_string(buf)?);
1307 }
1308
1309 let revision_count = buf.get_u16() as usize;
1311 let mut revisions = std::collections::HashMap::with_capacity(revision_count);
1312 for _ in 0..revision_count {
1313 let key = decode_string(buf)?;
1314 let val = buf.get_u64();
1315 revisions.insert(key, val);
1316 }
1317
1318 let flags = buf.get_u8();
1320 let since_revision = if flags & 0x80 != 0 {
1321 Some(buf.get_u64())
1322 } else {
1323 None
1324 };
1325 let origin = if flags & 0x40 != 0 {
1326 Some(decode_string(buf)?)
1327 } else {
1328 None
1329 };
1330
1331 Ok(Message::FederationSync(FederationSyncMessage {
1332 op,
1333 patterns,
1334 revisions,
1335 since_revision,
1336 origin,
1337 }))
1338}
1339
1340fn decode_bundle(buf: &mut &[u8]) -> Result<Message> {
1341 let flags = buf.get_u8();
1342 let has_ts = (flags & 0x80) != 0;
1343 let count = buf.get_u16() as usize;
1344
1345 let timestamp = if has_ts { Some(buf.get_u64()) } else { None };
1346
1347 let mut messages = Vec::with_capacity(count);
1348 for _ in 0..count {
1349 let len = buf.get_u16() as usize;
1350 let inner_bytes = &buf[..len];
1351 buf.advance(len);
1352 messages.push(decode_v3_binary(inner_bytes)?);
1353 }
1354
1355 Ok(Message::Bundle(BundleMessage {
1356 timestamp,
1357 messages,
1358 }))
1359}
1360
1361fn decode_sync(buf: &mut &[u8]) -> Result<Message> {
1362 let flags = buf.get_u8();
1363 let t1 = buf.get_u64();
1364 let t2 = if flags & 0x01 != 0 {
1365 Some(buf.get_u64())
1366 } else {
1367 None
1368 };
1369 let t3 = if flags & 0x02 != 0 {
1370 Some(buf.get_u64())
1371 } else {
1372 None
1373 };
1374
1375 Ok(Message::Sync(SyncMessage { t1, t2, t3 }))
1376}
1377
1378fn decode_ack(buf: &mut &[u8]) -> Result<Message> {
1379 let flags = buf.get_u8();
1380
1381 let address = if flags & 0x01 != 0 {
1382 Some(decode_string(buf)?)
1383 } else {
1384 None
1385 };
1386 let revision = if flags & 0x02 != 0 {
1387 Some(buf.get_u64())
1388 } else {
1389 None
1390 };
1391 let locked = if flags & 0x04 != 0 {
1392 Some(buf.get_u8() != 0)
1393 } else {
1394 None
1395 };
1396 let holder = if flags & 0x08 != 0 {
1397 Some(decode_string(buf)?)
1398 } else {
1399 None
1400 };
1401 let correlation_id = if flags & 0x10 != 0 {
1402 Some(buf.get_u32())
1403 } else {
1404 None
1405 };
1406
1407 Ok(Message::Ack(AckMessage {
1408 address,
1409 revision,
1410 locked,
1411 holder,
1412 correlation_id,
1413 }))
1414}
1415
1416fn decode_error(buf: &mut &[u8]) -> Result<Message> {
1417 let code = buf.get_u16();
1418 let message = decode_string(buf)?;
1419 let flags = buf.get_u8();
1420
1421 let address = if flags & 0x01 != 0 {
1422 Some(decode_string(buf)?)
1423 } else {
1424 None
1425 };
1426 let correlation_id = if flags & 0x02 != 0 {
1427 Some(buf.get_u32())
1428 } else {
1429 None
1430 };
1431
1432 Ok(Message::Error(ErrorMessage {
1433 code,
1434 message,
1435 address,
1436 correlation_id,
1437 }))
1438}
1439
1440fn decode_query(buf: &mut &[u8]) -> Result<Message> {
1441 let pattern = decode_string(buf)?;
1442 Ok(Message::Query(QueryMessage { pattern }))
1443}
1444
1445fn decode_result(buf: &mut &[u8]) -> Result<Message> {
1446 let count = buf.get_u16() as usize;
1447 let mut signals = Vec::with_capacity(count);
1448
1449 for _ in 0..count {
1450 let address = decode_string(buf)?;
1451 let sig_code = buf.get_u8();
1452 let opt_flags = buf.get_u8();
1453
1454 let datatype = if opt_flags & 0x01 != 0 {
1455 Some(decode_string(buf)?)
1456 } else {
1457 None
1458 };
1459 let access = if opt_flags & 0x02 != 0 {
1460 Some(decode_string(buf)?)
1461 } else {
1462 None
1463 };
1464
1465 signals.push(SignalDefinition {
1466 address,
1467 signal_type: signal_type_from_code(sig_code),
1468 datatype,
1469 access,
1470 meta: None,
1471 });
1472 }
1473
1474 Ok(Message::Result(ResultMessage { signals }))
1475}
1476
1477#[inline(always)]
1482fn decode_string(buf: &mut &[u8]) -> Result<String> {
1483 if buf.remaining() < 2 {
1484 return Err(Error::BufferTooSmall {
1485 needed: 2,
1486 have: buf.remaining(),
1487 });
1488 }
1489 let len = buf.get_u16() as usize;
1490 if buf.remaining() < len {
1491 return Err(Error::BufferTooSmall {
1492 needed: len,
1493 have: buf.remaining(),
1494 });
1495 }
1496 let bytes = &buf[..len];
1497 buf.advance(len);
1498 String::from_utf8(bytes.to_vec()).map_err(|e| Error::DecodeError(e.to_string()))
1499}
1500
1501#[inline]
1502fn decode_value_data(buf: &mut &[u8], vtype: u8) -> Result<Value> {
1503 match vtype {
1504 val::NULL => Ok(Value::Null),
1505 val::BOOL => {
1506 let b = buf.get_u8();
1507 Ok(Value::Bool(b != 0))
1508 }
1509 val::I8 => {
1510 let i = buf.get_i8() as i64;
1511 Ok(Value::Int(i))
1512 }
1513 val::I16 => {
1514 let i = buf.get_i16() as i64;
1515 Ok(Value::Int(i))
1516 }
1517 val::I32 => {
1518 let i = buf.get_i32() as i64;
1519 Ok(Value::Int(i))
1520 }
1521 val::I64 => {
1522 let i = buf.get_i64();
1523 Ok(Value::Int(i))
1524 }
1525 val::F32 => {
1526 let f = buf.get_f32() as f64;
1527 Ok(Value::Float(f))
1528 }
1529 val::F64 => {
1530 let f = buf.get_f64();
1531 Ok(Value::Float(f))
1532 }
1533 val::STRING => {
1534 let s = decode_string(buf)?;
1535 Ok(Value::String(s))
1536 }
1537 val::BYTES => {
1538 if buf.remaining() < 2 {
1539 return Err(Error::BufferTooSmall {
1540 needed: 2,
1541 have: buf.remaining(),
1542 });
1543 }
1544 let len = buf.get_u16() as usize;
1545 if buf.remaining() < len {
1546 return Err(Error::BufferTooSmall {
1547 needed: len,
1548 have: buf.remaining(),
1549 });
1550 }
1551 let bytes = buf[..len].to_vec();
1552 buf.advance(len);
1553 Ok(Value::Bytes(bytes))
1554 }
1555 val::ARRAY => {
1556 let count = buf.get_u16() as usize;
1557 let mut arr = Vec::with_capacity(count);
1558 for _ in 0..count {
1559 let item_type = buf.get_u8();
1560 arr.push(decode_value_data(buf, item_type)?);
1561 }
1562 Ok(Value::Array(arr))
1563 }
1564 val::MAP => {
1565 let count = buf.get_u16() as usize;
1566 let mut map = HashMap::with_capacity(count);
1567 for _ in 0..count {
1568 let key = decode_string(buf)?;
1569 let val_type = buf.get_u8();
1570 let val = decode_value_data(buf, val_type)?;
1571 map.insert(key, val);
1572 }
1573 Ok(Value::Map(map))
1574 }
1575 _ => Err(Error::DecodeError(format!(
1576 "unknown value type: 0x{:02x}",
1577 vtype
1578 ))),
1579 }
1580}
1581
1582fn signal_type_from_code(code: u8) -> SignalType {
1583 match code {
1584 sig::PARAM => SignalType::Param,
1585 sig::EVENT => SignalType::Event,
1586 sig::STREAM => SignalType::Stream,
1587 sig::GESTURE => SignalType::Gesture,
1588 sig::TIMELINE => SignalType::Timeline,
1589 _ => SignalType::Event, }
1591}
1592
1593fn gesture_phase_from_code(code: u8) -> GesturePhase {
1594 match code {
1595 phase::START => GesturePhase::Start,
1596 phase::MOVE => GesturePhase::Move,
1597 phase::END => GesturePhase::End,
1598 phase::CANCEL => GesturePhase::Cancel,
1599 _ => GesturePhase::Start, }
1601}
1602
1603fn is_msgpack_map(byte: u8) -> bool {
1608 (byte & 0xF0) == 0x80 || byte == 0xDE || byte == 0xDF
1610}
1611
1612fn decode_v2_msgpack(bytes: &[u8]) -> Result<Message> {
1613 rmp_serde::from_slice(bytes).map_err(|e| Error::DecodeError(e.to_string()))
1614}
1615
1616#[cfg(test)]
1621mod tests {
1622 use super::*;
1623
1624 #[test]
1625 fn test_hello_roundtrip() {
1626 let msg = Message::Hello(HelloMessage {
1627 version: 1,
1628 name: "Test Client".to_string(),
1629 features: vec!["param".to_string(), "event".to_string()],
1630 capabilities: None,
1631 token: None,
1632 });
1633
1634 let encoded = encode(&msg).unwrap();
1635 let (decoded, frame) = decode(&encoded).unwrap();
1636
1637 match decoded {
1638 Message::Hello(hello) => {
1639 assert_eq!(hello.version, 1);
1640 assert_eq!(hello.name, "Test Client");
1641 assert!(hello.features.contains(&"param".to_string()));
1642 assert!(hello.features.contains(&"event".to_string()));
1643 }
1644 _ => panic!("Expected Hello message"),
1645 }
1646
1647 assert_eq!(frame.flags.qos, QoS::Fire);
1648 assert_eq!(frame.flags.version, 1); }
1650
1651 #[test]
1652 fn test_set_roundtrip() {
1653 let msg = Message::Set(SetMessage {
1654 address: "/test/value".to_string(),
1655 value: Value::Float(0.75),
1656 revision: Some(42),
1657 lock: false,
1658 unlock: false,
1659 });
1660
1661 let encoded = encode(&msg).unwrap();
1662 let (decoded, frame) = decode(&encoded).unwrap();
1663
1664 match decoded {
1665 Message::Set(set) => {
1666 assert_eq!(set.address, "/test/value");
1667 assert_eq!(set.value.as_f64(), Some(0.75));
1668 assert_eq!(set.revision, Some(42));
1669 }
1670 _ => panic!("Expected Set message"),
1671 }
1672
1673 assert_eq!(frame.flags.qos, QoS::Confirm);
1674 }
1675
1676 #[test]
1677 fn test_set_size_reduction() {
1678 let msg = Message::Set(SetMessage {
1679 address: "/test/value".to_string(),
1680 value: Value::Float(0.5),
1681 revision: Some(1),
1682 lock: false,
1683 unlock: false,
1684 });
1685
1686 let binary_payload = encode_message(&msg).unwrap();
1688
1689 let msgpack_payload = rmp_serde::to_vec_named(&msg).unwrap();
1691
1692 println!("Binary payload: {} bytes", binary_payload.len());
1693 println!("MessagePack payload: {} bytes", msgpack_payload.len());
1694
1695 assert!(
1697 binary_payload.len() < msgpack_payload.len(),
1698 "Binary encoding ({}) should be smaller than MessagePack ({})",
1699 binary_payload.len(),
1700 msgpack_payload.len()
1701 );
1702
1703 let savings = 100 - (binary_payload.len() * 100 / msgpack_payload.len());
1705 println!("Size reduction: {}%", savings);
1706 assert!(
1707 savings >= 40,
1708 "Expected at least 40% size reduction, got {}%",
1709 savings
1710 );
1711 }
1712
1713 #[test]
1714 fn test_bundle_roundtrip() {
1715 let msg = Message::Bundle(BundleMessage {
1716 timestamp: Some(1000000),
1717 messages: vec![
1718 Message::Set(SetMessage {
1719 address: "/light/1".to_string(),
1720 value: Value::Float(1.0),
1721 revision: None,
1722 lock: false,
1723 unlock: false,
1724 }),
1725 Message::Set(SetMessage {
1726 address: "/light/2".to_string(),
1727 value: Value::Float(0.0),
1728 revision: None,
1729 lock: false,
1730 unlock: false,
1731 }),
1732 ],
1733 });
1734
1735 let encoded = encode(&msg).unwrap();
1736 let (decoded, _) = decode(&encoded).unwrap();
1737
1738 match decoded {
1739 Message::Bundle(bundle) => {
1740 assert_eq!(bundle.timestamp, Some(1000000));
1741 assert_eq!(bundle.messages.len(), 2);
1742 }
1743 _ => panic!("Expected Bundle message"),
1744 }
1745 }
1746
1747 #[test]
1748 fn test_value_types() {
1749 let values = vec![
1750 Value::Null,
1751 Value::Bool(true),
1752 Value::Int(42),
1753 Value::Float(1.25),
1754 Value::String("hello".to_string()),
1755 Value::Array(vec![Value::Int(1), Value::Int(2), Value::Int(3)]),
1756 ];
1757
1758 for value in values {
1759 let msg = Message::Set(SetMessage {
1760 address: "/test".to_string(),
1761 value: value.clone(),
1762 revision: None,
1763 lock: false,
1764 unlock: false,
1765 });
1766
1767 let encoded = encode(&msg).unwrap();
1768 let (decoded, _) = decode(&encoded).unwrap();
1769
1770 match decoded {
1771 Message::Set(set) => {
1772 assert_eq!(set.value, value);
1773 }
1774 _ => panic!("Expected Set message"),
1775 }
1776 }
1777 }
1778
1779 #[test]
1780 fn test_backward_compat_v2_decode() {
1781 let msg = SetMessage {
1783 address: "/test/value".to_string(),
1784 value: Value::Float(0.5),
1785 revision: Some(1),
1786 lock: false,
1787 unlock: false,
1788 };
1789
1790 let v2_bytes = rmp_serde::to_vec_named(&Message::Set(msg.clone())).unwrap();
1792
1793 let decoded = decode_message(&v2_bytes).unwrap();
1795
1796 match decoded {
1797 Message::Set(set) => {
1798 assert_eq!(set.address, "/test/value");
1799 assert_eq!(set.value.as_f64(), Some(0.5));
1800 }
1801 _ => panic!("Expected Set message"),
1802 }
1803 }
1804
1805 #[test]
1806 fn test_ping_pong() {
1807 let ping = encode(&Message::Ping).unwrap();
1808 let (decoded, _) = decode(&ping).unwrap();
1809 assert!(matches!(decoded, Message::Ping));
1810
1811 let pong = encode(&Message::Pong).unwrap();
1812 let (decoded, _) = decode(&pong).unwrap();
1813 assert!(matches!(decoded, Message::Pong));
1814 }
1815
1816 #[test]
1817 fn test_publish_event() {
1818 let msg = Message::Publish(PublishMessage {
1819 address: "/cue/fire".to_string(),
1820 signal: Some(SignalType::Event),
1821 value: None,
1822 payload: Some(Value::String("intro".to_string())),
1823 samples: None,
1824 rate: None,
1825 id: None,
1826 phase: None,
1827 timestamp: Some(1234567890),
1828 timeline: None,
1829 });
1830
1831 let encoded = encode(&msg).unwrap();
1832 let (decoded, _) = decode(&encoded).unwrap();
1833
1834 match decoded {
1835 Message::Publish(pub_msg) => {
1836 assert_eq!(pub_msg.address, "/cue/fire");
1837 assert_eq!(pub_msg.signal, Some(SignalType::Event));
1838 assert_eq!(pub_msg.timestamp, Some(1234567890));
1839 }
1840 _ => panic!("Expected Publish message"),
1841 }
1842 }
1843
1844 #[test]
1845 fn test_subscribe_roundtrip() {
1846 let msg = Message::Subscribe(SubscribeMessage {
1847 id: 42,
1848 pattern: "/lumen/scene/*/layer/**".to_string(),
1849 types: vec![SignalType::Param, SignalType::Stream],
1850 options: Some(SubscribeOptions {
1851 max_rate: Some(60),
1852 epsilon: Some(0.01),
1853 history: None,
1854 window: None,
1855 }),
1856 });
1857
1858 let encoded = encode(&msg).unwrap();
1859 let (decoded, _) = decode(&encoded).unwrap();
1860
1861 match decoded {
1862 Message::Subscribe(sub) => {
1863 assert_eq!(sub.id, 42);
1864 assert_eq!(sub.pattern, "/lumen/scene/*/layer/**");
1865 assert!(sub.types.contains(&SignalType::Param));
1866 assert!(sub.types.contains(&SignalType::Stream));
1867 assert_eq!(sub.options.as_ref().unwrap().max_rate, Some(60));
1868 }
1869 _ => panic!("Expected Subscribe message"),
1870 }
1871 }
1872}