1use crate::types::*;
14use crate::{Error, Frame, QoS, Result};
15use bytes::{Buf, BufMut, Bytes, BytesMut};
16use std::collections::HashMap;
17
18pub const ENCODING_VERSION: u8 = 1;
20
21pub mod msg {
23 pub const HELLO: u8 = 0x01;
24 pub const WELCOME: u8 = 0x02;
25 pub const ANNOUNCE: u8 = 0x03;
26 pub const SUBSCRIBE: u8 = 0x10;
27 pub const UNSUBSCRIBE: u8 = 0x11;
28 pub const PUBLISH: u8 = 0x20;
29 pub const SET: u8 = 0x21;
30 pub const GET: u8 = 0x22;
31 pub const SNAPSHOT: u8 = 0x23;
32 pub const FEDERATION_SYNC: u8 = 0x04;
33 pub const REPLAY: u8 = 0x24;
34 pub const BUNDLE: u8 = 0x30;
35 pub const SYNC: u8 = 0x40;
36 pub const PING: u8 = 0x41;
37 pub const PONG: u8 = 0x42;
38 pub const ACK: u8 = 0x50;
39 pub const ERROR: u8 = 0x51;
40 pub const QUERY: u8 = 0x60;
41 pub const RESULT: u8 = 0x61;
42}
43
44pub mod val {
46 pub const NULL: u8 = 0x00;
47 pub const BOOL: u8 = 0x01;
48 pub const I8: u8 = 0x02;
49 pub const I16: u8 = 0x03;
50 pub const I32: u8 = 0x04;
51 pub const I64: u8 = 0x05;
52 pub const F32: u8 = 0x06;
53 pub const F64: u8 = 0x07;
54 pub const STRING: u8 = 0x08;
55 pub const BYTES: u8 = 0x09;
56 pub const ARRAY: u8 = 0x0A;
57 pub const MAP: u8 = 0x0B;
58}
59
60pub mod sig {
62 pub const PARAM: u8 = 0;
63 pub const EVENT: u8 = 1;
64 pub const STREAM: u8 = 2;
65 pub const GESTURE: u8 = 3;
66 pub const TIMELINE: u8 = 4;
67}
68
69pub mod phase {
71 pub const START: u8 = 0;
72 pub const MOVE: u8 = 1;
73 pub const END: u8 = 2;
74 pub const CANCEL: u8 = 3;
75}
76
77#[inline]
83pub fn encode_message(message: &Message) -> Result<Bytes> {
84 let capacity = estimate_message_size(message);
86 let mut buf = BytesMut::with_capacity(capacity);
87 encode_message_to_buf(&mut buf, message)?;
88 Ok(buf.freeze())
89}
90
91#[inline]
93fn estimate_message_size(msg: &Message) -> usize {
94 match msg {
95 Message::Set(m) => {
96 2 + 2
97 + m.address.len()
98 + 9
99 + if m.revision.is_some() { 8 } else { 0 }
100 + if m.ttl.is_some() { 4 } else { 0 }
101 }
102 Message::Publish(m) => 2 + 2 + m.address.len() + 16,
103 Message::Hello(m) => 4 + m.name.len() + 2,
104 Message::Welcome(m) => 12 + m.name.len() + m.session.len() + 4,
105 Message::Subscribe(m) => 6 + m.pattern.len() + 16,
106 Message::Bundle(m) => 12 + m.messages.len() * 48,
107 Message::Replay(m) => 4 + m.pattern.len() + 20,
108 Message::FederationSync(m) => {
109 4 + m.patterns.iter().map(|p| 2 + p.len()).sum::<usize>() + m.revisions.len() * 12 + 16
110 }
111 Message::Ping | Message::Pong => 5, _ => 64, }
114}
115
116#[inline]
118pub fn decode_message(bytes: &[u8]) -> Result<Message> {
119 if bytes.is_empty() {
120 return Err(Error::BufferTooSmall { needed: 1, have: 0 });
121 }
122
123 let first = bytes[0];
124
125 if is_msgpack_map(first) {
128 decode_v2_msgpack(bytes)
130 } else {
131 decode_v3_binary(bytes)
133 }
134}
135
136#[inline]
138pub fn encode(message: &Message) -> Result<Bytes> {
139 let payload = encode_message(message)?;
140 let mut frame = Frame::new(payload).with_qos(message.default_qos());
141 frame.flags.version = 1; frame.encode()
143}
144
145pub fn encode_with_options(
147 message: &Message,
148 qos: Option<QoS>,
149 timestamp: Option<u64>,
150) -> Result<Bytes> {
151 let payload = encode_message(message)?;
152 let mut frame = Frame::new(payload);
153 frame.flags.version = 1; if let Some(qos) = qos {
156 frame = frame.with_qos(qos);
157 } else {
158 frame = frame.with_qos(message.default_qos());
159 }
160
161 if let Some(ts) = timestamp {
162 frame = frame.with_timestamp(ts);
163 }
164
165 frame.encode()
166}
167
168#[inline]
170pub fn decode(bytes: &[u8]) -> Result<(Message, Frame)> {
171 let frame = Frame::decode(bytes)?;
172 let message = decode_message(&frame.payload)?;
173 Ok((message, frame))
174}
175
176pub fn encode_payload(message: &Message) -> Result<Vec<u8>> {
178 let bytes = encode_message(message)?;
179 Ok(bytes.to_vec())
180}
181
182pub fn decode_payload(bytes: &[u8]) -> Result<Message> {
184 decode_message(bytes)
185}
186
187fn encode_message_to_buf(buf: &mut BytesMut, msg: &Message) -> Result<()> {
192 match msg {
193 Message::Hello(m) => encode_hello(buf, m),
194 Message::Welcome(m) => encode_welcome(buf, m),
195 Message::Announce(m) => encode_announce(buf, m),
196 Message::Subscribe(m) => encode_subscribe(buf, m),
197 Message::Unsubscribe(m) => encode_unsubscribe(buf, m),
198 Message::Publish(m) => encode_publish(buf, m),
199 Message::Set(m) => encode_set(buf, m),
200 Message::Get(m) => encode_get(buf, m),
201 Message::Snapshot(m) => encode_snapshot(buf, m),
202 Message::Replay(m) => encode_replay(buf, m),
203 Message::FederationSync(m) => encode_federation_sync(buf, m),
204 Message::Bundle(m) => encode_bundle(buf, m),
205 Message::Sync(m) => encode_sync(buf, m),
206 Message::Ping => {
207 buf.put_u8(msg::PING);
208 Ok(())
209 }
210 Message::Pong => {
211 buf.put_u8(msg::PONG);
212 Ok(())
213 }
214 Message::Ack(m) => encode_ack(buf, m),
215 Message::Error(m) => encode_error(buf, m),
216 Message::Query(m) => encode_query(buf, m),
217 Message::Result(m) => encode_result(buf, m),
218 }
219}
220
221#[inline]
224fn encode_set(buf: &mut BytesMut, msg: &SetMessage) -> Result<()> {
225 buf.put_u8(msg::SET);
226
227 let vtype = value_type_code(&msg.value);
228 let mut flags = vtype & 0x0F;
229 if msg.revision.is_some() {
230 flags |= 0x80;
231 }
232 if msg.lock {
233 flags |= 0x40;
234 }
235 if msg.unlock {
236 flags |= 0x20;
237 }
238 if msg.ttl.is_some() {
239 flags |= 0x10;
240 }
241 buf.put_u8(flags);
242
243 encode_string(buf, &msg.address)?;
245
246 encode_value_data(buf, &msg.value)?;
248
249 if let Some(rev) = msg.revision {
251 buf.put_u64(rev);
252 }
253
254 if let Some(ttl) = msg.ttl {
256 let raw = match ttl {
257 Ttl::Never => 0u32,
258 Ttl::Sliding(secs) => secs & 0x7FFF_FFFF,
259 Ttl::Absolute(secs) => (secs & 0x7FFF_FFFF) | 0x8000_0000,
260 };
261 buf.put_u32(raw);
262 }
263
264 Ok(())
265}
266
267fn encode_publish(buf: &mut BytesMut, msg: &PublishMessage) -> Result<()> {
270 buf.put_u8(msg::PUBLISH);
271
272 let sig_code = msg.signal.map(signal_type_code).unwrap_or(sig::EVENT);
273 let phase_code = msg.phase.map(gesture_phase_code).unwrap_or(phase::START);
274
275 let mut flags: u8 = (sig_code & 0x07) << 5;
276 if msg.timestamp.is_some() {
277 flags |= 0x10;
278 }
279 if msg.id.is_some() {
280 flags |= 0x08;
281 }
282 flags |= phase_code & 0x07;
283 buf.put_u8(flags);
284
285 encode_string(buf, &msg.address)?;
287
288 if let Some(ref value) = msg.value {
290 buf.put_u8(1); buf.put_u8(value_type_code(value));
292 encode_value_data(buf, value)?;
293 } else if let Some(ref payload) = msg.payload {
294 buf.put_u8(1); buf.put_u8(value_type_code(payload));
296 encode_value_data(buf, payload)?;
297 } else if let Some(ref samples) = msg.samples {
298 buf.put_u8(2); buf.put_u16(samples.len() as u16);
300 for sample in samples {
301 buf.put_f64(*sample);
302 }
303 } else {
304 buf.put_u8(0); }
306
307 if let Some(ts) = msg.timestamp {
309 buf.put_u64(ts);
310 }
311
312 if let Some(id) = msg.id {
314 buf.put_u32(id);
315 }
316
317 if let Some(rate) = msg.rate {
319 buf.put_u32(rate);
320 }
321
322 Ok(())
323}
324
325fn encode_hello(buf: &mut BytesMut, msg: &HelloMessage) -> Result<()> {
327 buf.put_u8(msg::HELLO);
328 buf.put_u8(msg.version);
329
330 let mut features: u8 = 0;
332 for f in &msg.features {
333 match f.as_str() {
334 "param" => features |= 0x80,
335 "event" => features |= 0x40,
336 "stream" => features |= 0x20,
337 "gesture" => features |= 0x10,
338 "timeline" => features |= 0x08,
339 "federation" => features |= 0x04,
340 _ => {}
341 }
342 }
343 buf.put_u8(features);
344
345 encode_string(buf, &msg.name)?;
347
348 if let Some(ref token) = msg.token {
350 encode_string(buf, token)?;
351 } else {
352 buf.put_u16(0);
353 }
354
355 Ok(())
356}
357
358fn encode_welcome(buf: &mut BytesMut, msg: &WelcomeMessage) -> Result<()> {
360 buf.put_u8(msg::WELCOME);
361 buf.put_u8(msg.version);
362
363 let mut features: u8 = 0;
365 for f in &msg.features {
366 match f.as_str() {
367 "param" => features |= 0x80,
368 "event" => features |= 0x40,
369 "stream" => features |= 0x20,
370 "gesture" => features |= 0x10,
371 "timeline" => features |= 0x08,
372 _ => {}
373 }
374 }
375 buf.put_u8(features);
376
377 buf.put_u64(msg.time);
379
380 encode_string(buf, &msg.session)?;
382
383 encode_string(buf, &msg.name)?;
385
386 if let Some(ref token) = msg.token {
388 encode_string(buf, token)?;
389 } else {
390 buf.put_u16(0);
391 }
392
393 Ok(())
394}
395
396fn encode_announce(buf: &mut BytesMut, msg: &AnnounceMessage) -> Result<()> {
398 buf.put_u8(msg::ANNOUNCE);
399
400 encode_string(buf, &msg.namespace)?;
401 buf.put_u16(msg.signals.len() as u16);
402
403 for sig in &msg.signals {
404 encode_string(buf, &sig.address)?;
405 buf.put_u8(signal_type_code(sig.signal_type));
406
407 let mut opt_flags: u8 = 0;
409 if sig.datatype.is_some() {
410 opt_flags |= 0x01;
411 }
412 if sig.access.is_some() {
413 opt_flags |= 0x02;
414 }
415 if sig.meta.is_some() {
416 opt_flags |= 0x04;
417 }
418 buf.put_u8(opt_flags);
419
420 if let Some(ref dt) = sig.datatype {
421 encode_string(buf, dt)?;
422 }
423 if let Some(ref access) = sig.access {
424 encode_string(buf, access)?;
425 }
426 if let Some(ref meta) = sig.meta {
427 let mut meta_flags: u8 = 0;
429 if meta.unit.is_some() {
430 meta_flags |= 0x01;
431 }
432 if meta.range.is_some() {
433 meta_flags |= 0x02;
434 }
435 if meta.default.is_some() {
436 meta_flags |= 0x04;
437 }
438 if meta.description.is_some() {
439 meta_flags |= 0x08;
440 }
441 buf.put_u8(meta_flags);
442
443 if let Some(ref unit) = meta.unit {
444 encode_string(buf, unit)?;
445 }
446 if let Some((min, max)) = meta.range {
447 buf.put_f64(min);
448 buf.put_f64(max);
449 }
450 if let Some(ref default) = meta.default {
451 buf.put_u8(value_type_code(default));
452 encode_value_data(buf, default)?;
453 }
454 if let Some(ref desc) = meta.description {
455 encode_string(buf, desc)?;
456 }
457 }
458 }
459
460 Ok(())
461}
462
463fn encode_subscribe(buf: &mut BytesMut, msg: &SubscribeMessage) -> Result<()> {
465 buf.put_u8(msg::SUBSCRIBE);
466 buf.put_u32(msg.id);
467
468 encode_string(buf, &msg.pattern)?;
469
470 let mut type_mask: u8 = 0;
472 if msg.types.is_empty() {
473 type_mask = 0xFF; } else {
475 for t in &msg.types {
476 match t {
477 SignalType::Param => type_mask |= 0x01,
478 SignalType::Event => type_mask |= 0x02,
479 SignalType::Stream => type_mask |= 0x04,
480 SignalType::Gesture => type_mask |= 0x08,
481 SignalType::Timeline => type_mask |= 0x10,
482 }
483 }
484 }
485 buf.put_u8(type_mask);
486
487 if let Some(ref opts) = msg.options {
489 let mut opt_flags: u8 = 0;
490 if opts.max_rate.is_some() {
491 opt_flags |= 0x01;
492 }
493 if opts.epsilon.is_some() {
494 opt_flags |= 0x02;
495 }
496 if opts.history.is_some() {
497 opt_flags |= 0x04;
498 }
499 if opts.window.is_some() {
500 opt_flags |= 0x08;
501 }
502 buf.put_u8(opt_flags);
503
504 if let Some(rate) = opts.max_rate {
505 buf.put_u32(rate);
506 }
507 if let Some(eps) = opts.epsilon {
508 buf.put_f64(eps);
509 }
510 if let Some(hist) = opts.history {
511 buf.put_u32(hist);
512 }
513 if let Some(win) = opts.window {
514 buf.put_u32(win);
515 }
516 } else {
517 buf.put_u8(0); }
519
520 Ok(())
521}
522
523fn encode_unsubscribe(buf: &mut BytesMut, msg: &UnsubscribeMessage) -> Result<()> {
525 buf.put_u8(msg::UNSUBSCRIBE);
526 buf.put_u32(msg.id);
527 Ok(())
528}
529
530fn encode_get(buf: &mut BytesMut, msg: &GetMessage) -> Result<()> {
532 buf.put_u8(msg::GET);
533 encode_string(buf, &msg.address)?;
534 Ok(())
535}
536
537fn encode_snapshot(buf: &mut BytesMut, msg: &SnapshotMessage) -> Result<()> {
539 buf.put_u8(msg::SNAPSHOT);
540 buf.put_u16(msg.params.len() as u16);
541
542 for param in &msg.params {
543 encode_string(buf, ¶m.address)?;
544 buf.put_u8(value_type_code(¶m.value));
545 encode_value_data(buf, ¶m.value)?;
546 buf.put_u64(param.revision);
547
548 let mut opt_flags: u8 = 0;
549 if param.writer.is_some() {
550 opt_flags |= 0x01;
551 }
552 if param.timestamp.is_some() {
553 opt_flags |= 0x02;
554 }
555 buf.put_u8(opt_flags);
556
557 if let Some(ref writer) = param.writer {
558 encode_string(buf, writer)?;
559 }
560 if let Some(ts) = param.timestamp {
561 buf.put_u64(ts);
562 }
563 }
564
565 Ok(())
566}
567
568fn encode_replay(buf: &mut BytesMut, msg: &ReplayMessage) -> Result<()> {
571 buf.put_u8(msg::REPLAY);
572
573 let mut flags: u8 = 0;
574 if msg.from.is_some() {
575 flags |= 0x80;
576 }
577 if msg.to.is_some() {
578 flags |= 0x40;
579 }
580 if msg.limit.is_some() {
581 flags |= 0x20;
582 }
583 buf.put_u8(flags);
584
585 encode_string(buf, &msg.pattern)?;
586
587 if let Some(from) = msg.from {
588 buf.put_u64(from);
589 }
590 if let Some(to) = msg.to {
591 buf.put_u64(to);
592 }
593 if let Some(limit) = msg.limit {
594 buf.put_u32(limit);
595 }
596
597 let mut type_mask: u8 = 0;
599 if msg.types.is_empty() {
600 type_mask = 0xFF; } else {
602 for t in &msg.types {
603 match t {
604 SignalType::Param => type_mask |= 0x01,
605 SignalType::Event => type_mask |= 0x02,
606 SignalType::Stream => type_mask |= 0x04,
607 SignalType::Gesture => type_mask |= 0x08,
608 SignalType::Timeline => type_mask |= 0x10,
609 }
610 }
611 }
612 buf.put_u8(type_mask);
613
614 Ok(())
615}
616
617fn encode_federation_sync(buf: &mut BytesMut, msg: &FederationSyncMessage) -> Result<()> {
620 buf.put_u8(msg::FEDERATION_SYNC);
621 buf.put_u8(msg.op as u8);
622
623 buf.put_u16(msg.patterns.len() as u16);
625 for pattern in &msg.patterns {
626 encode_string(buf, pattern)?;
627 }
628
629 buf.put_u16(msg.revisions.len() as u16);
631 for (key, val) in &msg.revisions {
632 encode_string(buf, key)?;
633 buf.put_u64(*val);
634 }
635
636 let mut flags: u8 = 0;
638 if msg.since_revision.is_some() {
639 flags |= 0x80;
640 }
641 if msg.origin.is_some() {
642 flags |= 0x40;
643 }
644 buf.put_u8(flags);
645
646 if let Some(since) = msg.since_revision {
647 buf.put_u64(since);
648 }
649 if let Some(ref origin) = msg.origin {
650 encode_string(buf, origin)?;
651 }
652
653 Ok(())
654}
655
656fn encode_bundle(buf: &mut BytesMut, msg: &BundleMessage) -> Result<()> {
658 buf.put_u8(msg::BUNDLE);
659
660 let mut flags: u8 = 0;
661 if msg.timestamp.is_some() {
662 flags |= 0x80;
663 }
664 buf.put_u8(flags);
665
666 buf.put_u16(msg.messages.len() as u16);
667
668 if let Some(ts) = msg.timestamp {
669 buf.put_u64(ts);
670 }
671
672 for inner_msg in &msg.messages {
674 let mut inner_buf = BytesMut::with_capacity(64);
675 encode_message_to_buf(&mut inner_buf, inner_msg)?;
676 buf.put_u16(inner_buf.len() as u16);
677 buf.extend_from_slice(&inner_buf);
678 }
679
680 Ok(())
681}
682
683fn encode_sync(buf: &mut BytesMut, msg: &SyncMessage) -> Result<()> {
685 buf.put_u8(msg::SYNC);
686
687 let mut flags: u8 = 0;
688 if msg.t2.is_some() {
689 flags |= 0x01;
690 }
691 if msg.t3.is_some() {
692 flags |= 0x02;
693 }
694 buf.put_u8(flags);
695
696 buf.put_u64(msg.t1);
697 if let Some(t2) = msg.t2 {
698 buf.put_u64(t2);
699 }
700 if let Some(t3) = msg.t3 {
701 buf.put_u64(t3);
702 }
703
704 Ok(())
705}
706
707fn encode_ack(buf: &mut BytesMut, msg: &AckMessage) -> Result<()> {
709 buf.put_u8(msg::ACK);
710
711 let mut flags: u8 = 0;
712 if msg.address.is_some() {
713 flags |= 0x01;
714 }
715 if msg.revision.is_some() {
716 flags |= 0x02;
717 }
718 if msg.locked.is_some() {
719 flags |= 0x04;
720 }
721 if msg.holder.is_some() {
722 flags |= 0x08;
723 }
724 if msg.correlation_id.is_some() {
725 flags |= 0x10;
726 }
727 buf.put_u8(flags);
728
729 if let Some(ref addr) = msg.address {
730 encode_string(buf, addr)?;
731 }
732 if let Some(rev) = msg.revision {
733 buf.put_u64(rev);
734 }
735 if let Some(locked) = msg.locked {
736 buf.put_u8(if locked { 1 } else { 0 });
737 }
738 if let Some(ref holder) = msg.holder {
739 encode_string(buf, holder)?;
740 }
741 if let Some(corr) = msg.correlation_id {
742 buf.put_u32(corr);
743 }
744
745 Ok(())
746}
747
748fn encode_error(buf: &mut BytesMut, msg: &ErrorMessage) -> Result<()> {
750 buf.put_u8(msg::ERROR);
751 buf.put_u16(msg.code);
752 encode_string(buf, &msg.message)?;
753
754 let mut flags: u8 = 0;
755 if msg.address.is_some() {
756 flags |= 0x01;
757 }
758 if msg.correlation_id.is_some() {
759 flags |= 0x02;
760 }
761 buf.put_u8(flags);
762
763 if let Some(ref addr) = msg.address {
764 encode_string(buf, addr)?;
765 }
766 if let Some(corr) = msg.correlation_id {
767 buf.put_u32(corr);
768 }
769
770 Ok(())
771}
772
773fn encode_query(buf: &mut BytesMut, msg: &QueryMessage) -> Result<()> {
775 buf.put_u8(msg::QUERY);
776 encode_string(buf, &msg.pattern)?;
777 Ok(())
778}
779
780fn encode_result(buf: &mut BytesMut, msg: &ResultMessage) -> Result<()> {
782 buf.put_u8(msg::RESULT);
783 buf.put_u16(msg.signals.len() as u16);
784
785 for sig in &msg.signals {
786 encode_string(buf, &sig.address)?;
787 buf.put_u8(signal_type_code(sig.signal_type));
788
789 let mut opt_flags: u8 = 0;
790 if sig.datatype.is_some() {
791 opt_flags |= 0x01;
792 }
793 if sig.access.is_some() {
794 opt_flags |= 0x02;
795 }
796 buf.put_u8(opt_flags);
797
798 if let Some(ref dt) = sig.datatype {
799 encode_string(buf, dt)?;
800 }
801 if let Some(ref access) = sig.access {
802 encode_string(buf, access)?;
803 }
804 }
805
806 Ok(())
807}
808
809#[inline(always)]
814fn encode_string(buf: &mut BytesMut, s: &str) -> Result<()> {
815 let bytes = s.as_bytes();
816 if bytes.len() > u16::MAX as usize {
817 return Err(Error::PayloadTooLarge(bytes.len()));
818 }
819 buf.put_u16(bytes.len() as u16);
820 buf.extend_from_slice(bytes);
821 Ok(())
822}
823
824#[inline]
825fn encode_value_data(buf: &mut BytesMut, value: &Value) -> Result<()> {
826 match value {
827 Value::Null => {} Value::Bool(b) => buf.put_u8(if *b { 1 } else { 0 }),
829 Value::Int(i) => buf.put_i64(*i),
830 Value::Float(f) => buf.put_f64(*f),
831 Value::String(s) => encode_string(buf, s)?,
832 Value::Bytes(b) => {
833 buf.put_u16(b.len() as u16);
834 buf.extend_from_slice(b);
835 }
836 Value::Array(arr) => {
837 buf.put_u16(arr.len() as u16);
838 for item in arr {
839 buf.put_u8(value_type_code(item));
840 encode_value_data(buf, item)?;
841 }
842 }
843 Value::Map(map) => {
844 buf.put_u16(map.len() as u16);
845 for (key, val) in map {
846 encode_string(buf, key)?;
847 buf.put_u8(value_type_code(val));
848 encode_value_data(buf, val)?;
849 }
850 }
851 }
852 Ok(())
853}
854
855#[inline(always)]
856fn value_type_code(value: &Value) -> u8 {
857 match value {
858 Value::Null => val::NULL,
859 Value::Bool(_) => val::BOOL,
860 Value::Int(_) => val::I64,
861 Value::Float(_) => val::F64,
862 Value::String(_) => val::STRING,
863 Value::Bytes(_) => val::BYTES,
864 Value::Array(_) => val::ARRAY,
865 Value::Map(_) => val::MAP,
866 }
867}
868
869fn signal_type_code(sig: SignalType) -> u8 {
870 match sig {
871 SignalType::Param => sig::PARAM,
872 SignalType::Event => sig::EVENT,
873 SignalType::Stream => sig::STREAM,
874 SignalType::Gesture => sig::GESTURE,
875 SignalType::Timeline => sig::TIMELINE,
876 }
877}
878
879fn gesture_phase_code(phase: GesturePhase) -> u8 {
880 match phase {
881 GesturePhase::Start => phase::START,
882 GesturePhase::Move => phase::MOVE,
883 GesturePhase::End => phase::END,
884 GesturePhase::Cancel => phase::CANCEL,
885 }
886}
887
888fn decode_v3_binary(bytes: &[u8]) -> Result<Message> {
893 if bytes.is_empty() {
894 return Err(Error::BufferTooSmall { needed: 1, have: 0 });
895 }
896
897 let mut buf = bytes;
898 let msg_type = buf.get_u8();
899
900 match msg_type {
901 msg::HELLO => decode_hello(&mut buf),
902 msg::WELCOME => decode_welcome(&mut buf),
903 msg::ANNOUNCE => decode_announce(&mut buf),
904 msg::SUBSCRIBE => decode_subscribe(&mut buf),
905 msg::UNSUBSCRIBE => decode_unsubscribe(&mut buf),
906 msg::PUBLISH => decode_publish(&mut buf),
907 msg::SET => decode_set(&mut buf),
908 msg::GET => decode_get(&mut buf),
909 msg::SNAPSHOT => decode_snapshot(&mut buf),
910 msg::REPLAY => decode_replay(&mut buf),
911 msg::FEDERATION_SYNC => decode_federation_sync(&mut buf),
912 msg::BUNDLE => decode_bundle(&mut buf),
913 msg::SYNC => decode_sync(&mut buf),
914 msg::PING => Ok(Message::Ping),
915 msg::PONG => Ok(Message::Pong),
916 msg::ACK => decode_ack(&mut buf),
917 msg::ERROR => decode_error(&mut buf),
918 msg::QUERY => decode_query(&mut buf),
919 msg::RESULT => decode_result(&mut buf),
920 _ => Err(Error::UnknownMessageType(msg_type)),
921 }
922}
923
924#[inline]
925fn decode_set(buf: &mut &[u8]) -> Result<Message> {
926 let flags = buf.get_u8();
927 let vtype = flags & 0x0F;
928 let has_rev = (flags & 0x80) != 0;
929 let lock = (flags & 0x40) != 0;
930 let unlock = (flags & 0x20) != 0;
931 let has_ttl = (flags & 0x10) != 0;
932
933 let address = decode_string(buf)?;
934 let value = decode_value_data(buf, vtype)?;
935
936 let revision = if has_rev { Some(buf.get_u64()) } else { None };
937
938 let ttl = if has_ttl {
939 let raw = buf.get_u32();
940 if raw == 0 {
941 Some(Ttl::Never)
942 } else if raw & 0x8000_0000 != 0 {
943 Some(Ttl::Absolute(raw & 0x7FFF_FFFF))
944 } else {
945 Some(Ttl::Sliding(raw))
946 }
947 } else {
948 None
949 };
950
951 Ok(Message::Set(SetMessage {
952 address,
953 value,
954 revision,
955 lock,
956 unlock,
957 ttl,
958 }))
959}
960
961fn decode_publish(buf: &mut &[u8]) -> Result<Message> {
962 let flags = buf.get_u8();
963 let sig_code = (flags >> 5) & 0x07;
964 let has_ts = (flags & 0x10) != 0;
965 let has_id = (flags & 0x08) != 0;
966 let phase_code = flags & 0x07;
967
968 let address = decode_string(buf)?;
969
970 let value_indicator = buf.get_u8();
972 let (value, payload, samples) = match value_indicator {
973 0 => (None, None, None),
974 1 => {
975 let vtype = buf.get_u8();
976 let v = decode_value_data(buf, vtype)?;
977 (Some(v), None, None)
978 }
979 2 => {
980 let count = buf.get_u16() as usize;
981 let mut s = Vec::with_capacity(count);
982 for _ in 0..count {
983 s.push(buf.get_f64());
984 }
985 (None, None, Some(s))
986 }
987 _ => (None, None, None),
988 };
989
990 let timestamp = if has_ts { Some(buf.get_u64()) } else { None };
991 let id = if has_id { Some(buf.get_u32()) } else { None };
992
993 let rate = if buf.remaining() >= 4 {
995 Some(buf.get_u32())
996 } else {
997 None
998 };
999
1000 let signal = Some(signal_type_from_code(sig_code));
1001 let phase = Some(gesture_phase_from_code(phase_code));
1002
1003 Ok(Message::Publish(PublishMessage {
1004 address,
1005 signal,
1006 value,
1007 payload,
1008 samples,
1009 rate,
1010 id,
1011 phase,
1012 timestamp,
1013 timeline: None, }))
1015}
1016
1017fn decode_hello(buf: &mut &[u8]) -> Result<Message> {
1018 let version = buf.get_u8();
1019 let feature_flags = buf.get_u8();
1020
1021 let mut features = Vec::new();
1022 if feature_flags & 0x80 != 0 {
1023 features.push("param".to_string());
1024 }
1025 if feature_flags & 0x40 != 0 {
1026 features.push("event".to_string());
1027 }
1028 if feature_flags & 0x20 != 0 {
1029 features.push("stream".to_string());
1030 }
1031 if feature_flags & 0x10 != 0 {
1032 features.push("gesture".to_string());
1033 }
1034 if feature_flags & 0x08 != 0 {
1035 features.push("timeline".to_string());
1036 }
1037 if feature_flags & 0x04 != 0 {
1038 features.push("federation".to_string());
1039 }
1040
1041 let name = decode_string(buf)?;
1042 let token_str = decode_string(buf)?;
1043 let token = if token_str.is_empty() {
1044 None
1045 } else {
1046 Some(token_str)
1047 };
1048
1049 Ok(Message::Hello(HelloMessage {
1050 version,
1051 name,
1052 features,
1053 capabilities: None,
1054 token,
1055 }))
1056}
1057
1058fn decode_welcome(buf: &mut &[u8]) -> Result<Message> {
1059 let version = buf.get_u8();
1060 let feature_flags = buf.get_u8();
1061
1062 let mut features = Vec::new();
1063 if feature_flags & 0x80 != 0 {
1064 features.push("param".to_string());
1065 }
1066 if feature_flags & 0x40 != 0 {
1067 features.push("event".to_string());
1068 }
1069 if feature_flags & 0x20 != 0 {
1070 features.push("stream".to_string());
1071 }
1072 if feature_flags & 0x10 != 0 {
1073 features.push("gesture".to_string());
1074 }
1075 if feature_flags & 0x08 != 0 {
1076 features.push("timeline".to_string());
1077 }
1078
1079 let time = buf.get_u64();
1080 let session = decode_string(buf)?;
1081 let name = decode_string(buf)?;
1082
1083 let token_str = decode_string(buf)?;
1084 let token = if token_str.is_empty() {
1085 None
1086 } else {
1087 Some(token_str)
1088 };
1089
1090 Ok(Message::Welcome(WelcomeMessage {
1091 version,
1092 session,
1093 name,
1094 features,
1095 time,
1096 token,
1097 }))
1098}
1099
1100fn decode_announce(buf: &mut &[u8]) -> Result<Message> {
1101 let namespace = decode_string(buf)?;
1102 let count = buf.get_u16() as usize;
1103
1104 let mut signals = Vec::with_capacity(count);
1105 for _ in 0..count {
1106 let address = decode_string(buf)?;
1107 let sig_code = buf.get_u8();
1108 let opt_flags = buf.get_u8();
1109
1110 let datatype = if opt_flags & 0x01 != 0 {
1111 Some(decode_string(buf)?)
1112 } else {
1113 None
1114 };
1115 let access = if opt_flags & 0x02 != 0 {
1116 Some(decode_string(buf)?)
1117 } else {
1118 None
1119 };
1120
1121 let meta = if opt_flags & 0x04 != 0 {
1122 let meta_flags = buf.get_u8();
1123
1124 let unit = if meta_flags & 0x01 != 0 {
1125 Some(decode_string(buf)?)
1126 } else {
1127 None
1128 };
1129 let range = if meta_flags & 0x02 != 0 {
1130 let min = buf.get_f64();
1131 let max = buf.get_f64();
1132 Some((min, max))
1133 } else {
1134 None
1135 };
1136 let default = if meta_flags & 0x04 != 0 {
1137 let vtype = buf.get_u8();
1138 Some(decode_value_data(buf, vtype)?)
1139 } else {
1140 None
1141 };
1142 let description = if meta_flags & 0x08 != 0 {
1143 Some(decode_string(buf)?)
1144 } else {
1145 None
1146 };
1147
1148 Some(SignalMeta {
1149 unit,
1150 range,
1151 default,
1152 description,
1153 })
1154 } else {
1155 None
1156 };
1157
1158 signals.push(SignalDefinition {
1159 address,
1160 signal_type: signal_type_from_code(sig_code),
1161 datatype,
1162 access,
1163 meta,
1164 });
1165 }
1166
1167 Ok(Message::Announce(AnnounceMessage {
1168 namespace,
1169 signals,
1170 meta: None,
1171 }))
1172}
1173
1174fn decode_subscribe(buf: &mut &[u8]) -> Result<Message> {
1175 let id = buf.get_u32();
1176 let pattern = decode_string(buf)?;
1177 let type_mask = buf.get_u8();
1178
1179 let mut types = Vec::new();
1180 if type_mask == 0xFF {
1181 } else {
1183 if type_mask & 0x01 != 0 {
1184 types.push(SignalType::Param);
1185 }
1186 if type_mask & 0x02 != 0 {
1187 types.push(SignalType::Event);
1188 }
1189 if type_mask & 0x04 != 0 {
1190 types.push(SignalType::Stream);
1191 }
1192 if type_mask & 0x08 != 0 {
1193 types.push(SignalType::Gesture);
1194 }
1195 if type_mask & 0x10 != 0 {
1196 types.push(SignalType::Timeline);
1197 }
1198 }
1199
1200 let opt_flags = buf.get_u8();
1201 let options = if opt_flags != 0 {
1202 let max_rate = if opt_flags & 0x01 != 0 {
1203 Some(buf.get_u32())
1204 } else {
1205 None
1206 };
1207 let epsilon = if opt_flags & 0x02 != 0 {
1208 Some(buf.get_f64())
1209 } else {
1210 None
1211 };
1212 let history = if opt_flags & 0x04 != 0 {
1213 Some(buf.get_u32())
1214 } else {
1215 None
1216 };
1217 let window = if opt_flags & 0x08 != 0 {
1218 Some(buf.get_u32())
1219 } else {
1220 None
1221 };
1222
1223 Some(SubscribeOptions {
1224 max_rate,
1225 epsilon,
1226 history,
1227 window,
1228 })
1229 } else {
1230 None
1231 };
1232
1233 Ok(Message::Subscribe(SubscribeMessage {
1234 id,
1235 pattern,
1236 types,
1237 options,
1238 }))
1239}
1240
1241fn decode_unsubscribe(buf: &mut &[u8]) -> Result<Message> {
1242 let id = buf.get_u32();
1243 Ok(Message::Unsubscribe(UnsubscribeMessage { id }))
1244}
1245
1246fn decode_get(buf: &mut &[u8]) -> Result<Message> {
1247 let address = decode_string(buf)?;
1248 Ok(Message::Get(GetMessage { address }))
1249}
1250
1251fn decode_snapshot(buf: &mut &[u8]) -> Result<Message> {
1252 let count = buf.get_u16() as usize;
1253 let mut params = Vec::with_capacity(count);
1254
1255 for _ in 0..count {
1256 let address = decode_string(buf)?;
1257 let vtype = buf.get_u8();
1258 let value = decode_value_data(buf, vtype)?;
1259 let revision = buf.get_u64();
1260 let opt_flags = buf.get_u8();
1261
1262 let writer = if opt_flags & 0x01 != 0 {
1263 Some(decode_string(buf)?)
1264 } else {
1265 None
1266 };
1267 let timestamp = if opt_flags & 0x02 != 0 {
1268 Some(buf.get_u64())
1269 } else {
1270 None
1271 };
1272
1273 params.push(ParamValue {
1274 address,
1275 value,
1276 revision,
1277 writer,
1278 timestamp,
1279 });
1280 }
1281
1282 Ok(Message::Snapshot(SnapshotMessage { params }))
1283}
1284
1285fn decode_replay(buf: &mut &[u8]) -> Result<Message> {
1286 let flags = buf.get_u8();
1287 let has_from = (flags & 0x80) != 0;
1288 let has_to = (flags & 0x40) != 0;
1289 let has_limit = (flags & 0x20) != 0;
1290
1291 let pattern = decode_string(buf)?;
1292
1293 let from = if has_from { Some(buf.get_u64()) } else { None };
1294 let to = if has_to { Some(buf.get_u64()) } else { None };
1295 let limit = if has_limit { Some(buf.get_u32()) } else { None };
1296
1297 let type_mask = buf.get_u8();
1298 let mut types = Vec::new();
1299 if type_mask != 0xFF {
1300 if type_mask & 0x01 != 0 {
1301 types.push(SignalType::Param);
1302 }
1303 if type_mask & 0x02 != 0 {
1304 types.push(SignalType::Event);
1305 }
1306 if type_mask & 0x04 != 0 {
1307 types.push(SignalType::Stream);
1308 }
1309 if type_mask & 0x08 != 0 {
1310 types.push(SignalType::Gesture);
1311 }
1312 if type_mask & 0x10 != 0 {
1313 types.push(SignalType::Timeline);
1314 }
1315 }
1316
1317 Ok(Message::Replay(ReplayMessage {
1318 pattern,
1319 from,
1320 to,
1321 limit,
1322 types,
1323 }))
1324}
1325
1326fn decode_federation_sync(buf: &mut &[u8]) -> Result<Message> {
1327 let op_byte = buf.get_u8();
1328 let op = match op_byte {
1329 0x01 => FederationOp::DeclareNamespaces,
1330 0x02 => FederationOp::RequestSync,
1331 0x03 => FederationOp::RevisionVector,
1332 0x04 => FederationOp::SyncComplete,
1333 _ => return Err(Error::DecodeError("unknown federation op".into())),
1334 };
1335
1336 let pattern_count = buf.get_u16() as usize;
1338 let mut patterns = Vec::with_capacity(pattern_count);
1339 for _ in 0..pattern_count {
1340 patterns.push(decode_string(buf)?);
1341 }
1342
1343 let revision_count = buf.get_u16() as usize;
1345 let mut revisions = std::collections::HashMap::with_capacity(revision_count);
1346 for _ in 0..revision_count {
1347 let key = decode_string(buf)?;
1348 let val = buf.get_u64();
1349 revisions.insert(key, val);
1350 }
1351
1352 let flags = buf.get_u8();
1354 let since_revision = if flags & 0x80 != 0 {
1355 Some(buf.get_u64())
1356 } else {
1357 None
1358 };
1359 let origin = if flags & 0x40 != 0 {
1360 Some(decode_string(buf)?)
1361 } else {
1362 None
1363 };
1364
1365 Ok(Message::FederationSync(FederationSyncMessage {
1366 op,
1367 patterns,
1368 revisions,
1369 since_revision,
1370 origin,
1371 }))
1372}
1373
1374fn decode_bundle(buf: &mut &[u8]) -> Result<Message> {
1375 let flags = buf.get_u8();
1376 let has_ts = (flags & 0x80) != 0;
1377 let count = buf.get_u16() as usize;
1378
1379 let timestamp = if has_ts { Some(buf.get_u64()) } else { None };
1380
1381 let mut messages = Vec::with_capacity(count);
1382 for _ in 0..count {
1383 let len = buf.get_u16() as usize;
1384 let inner_bytes = &buf[..len];
1385 buf.advance(len);
1386 messages.push(decode_v3_binary(inner_bytes)?);
1387 }
1388
1389 Ok(Message::Bundle(BundleMessage {
1390 timestamp,
1391 messages,
1392 }))
1393}
1394
1395fn decode_sync(buf: &mut &[u8]) -> Result<Message> {
1396 let flags = buf.get_u8();
1397 let t1 = buf.get_u64();
1398 let t2 = if flags & 0x01 != 0 {
1399 Some(buf.get_u64())
1400 } else {
1401 None
1402 };
1403 let t3 = if flags & 0x02 != 0 {
1404 Some(buf.get_u64())
1405 } else {
1406 None
1407 };
1408
1409 Ok(Message::Sync(SyncMessage { t1, t2, t3 }))
1410}
1411
1412fn decode_ack(buf: &mut &[u8]) -> Result<Message> {
1413 let flags = buf.get_u8();
1414
1415 let address = if flags & 0x01 != 0 {
1416 Some(decode_string(buf)?)
1417 } else {
1418 None
1419 };
1420 let revision = if flags & 0x02 != 0 {
1421 Some(buf.get_u64())
1422 } else {
1423 None
1424 };
1425 let locked = if flags & 0x04 != 0 {
1426 Some(buf.get_u8() != 0)
1427 } else {
1428 None
1429 };
1430 let holder = if flags & 0x08 != 0 {
1431 Some(decode_string(buf)?)
1432 } else {
1433 None
1434 };
1435 let correlation_id = if flags & 0x10 != 0 {
1436 Some(buf.get_u32())
1437 } else {
1438 None
1439 };
1440
1441 Ok(Message::Ack(AckMessage {
1442 address,
1443 revision,
1444 locked,
1445 holder,
1446 correlation_id,
1447 }))
1448}
1449
1450fn decode_error(buf: &mut &[u8]) -> Result<Message> {
1451 let code = buf.get_u16();
1452 let message = decode_string(buf)?;
1453 let flags = buf.get_u8();
1454
1455 let address = if flags & 0x01 != 0 {
1456 Some(decode_string(buf)?)
1457 } else {
1458 None
1459 };
1460 let correlation_id = if flags & 0x02 != 0 {
1461 Some(buf.get_u32())
1462 } else {
1463 None
1464 };
1465
1466 Ok(Message::Error(ErrorMessage {
1467 code,
1468 message,
1469 address,
1470 correlation_id,
1471 }))
1472}
1473
1474fn decode_query(buf: &mut &[u8]) -> Result<Message> {
1475 let pattern = decode_string(buf)?;
1476 Ok(Message::Query(QueryMessage { pattern }))
1477}
1478
1479fn decode_result(buf: &mut &[u8]) -> Result<Message> {
1480 let count = buf.get_u16() as usize;
1481 let mut signals = Vec::with_capacity(count);
1482
1483 for _ in 0..count {
1484 let address = decode_string(buf)?;
1485 let sig_code = buf.get_u8();
1486 let opt_flags = buf.get_u8();
1487
1488 let datatype = if opt_flags & 0x01 != 0 {
1489 Some(decode_string(buf)?)
1490 } else {
1491 None
1492 };
1493 let access = if opt_flags & 0x02 != 0 {
1494 Some(decode_string(buf)?)
1495 } else {
1496 None
1497 };
1498
1499 signals.push(SignalDefinition {
1500 address,
1501 signal_type: signal_type_from_code(sig_code),
1502 datatype,
1503 access,
1504 meta: None,
1505 });
1506 }
1507
1508 Ok(Message::Result(ResultMessage { signals }))
1509}
1510
1511#[inline(always)]
1516fn decode_string(buf: &mut &[u8]) -> Result<String> {
1517 if buf.remaining() < 2 {
1518 return Err(Error::BufferTooSmall {
1519 needed: 2,
1520 have: buf.remaining(),
1521 });
1522 }
1523 let len = buf.get_u16() as usize;
1524 if buf.remaining() < len {
1525 return Err(Error::BufferTooSmall {
1526 needed: len,
1527 have: buf.remaining(),
1528 });
1529 }
1530 let bytes = &buf[..len];
1531 buf.advance(len);
1532 String::from_utf8(bytes.to_vec()).map_err(|e| Error::DecodeError(e.to_string()))
1533}
1534
1535#[inline]
1536fn decode_value_data(buf: &mut &[u8], vtype: u8) -> Result<Value> {
1537 match vtype {
1538 val::NULL => Ok(Value::Null),
1539 val::BOOL => {
1540 let b = buf.get_u8();
1541 Ok(Value::Bool(b != 0))
1542 }
1543 val::I8 => {
1544 let i = buf.get_i8() as i64;
1545 Ok(Value::Int(i))
1546 }
1547 val::I16 => {
1548 let i = buf.get_i16() as i64;
1549 Ok(Value::Int(i))
1550 }
1551 val::I32 => {
1552 let i = buf.get_i32() as i64;
1553 Ok(Value::Int(i))
1554 }
1555 val::I64 => {
1556 let i = buf.get_i64();
1557 Ok(Value::Int(i))
1558 }
1559 val::F32 => {
1560 let f = buf.get_f32() as f64;
1561 Ok(Value::Float(f))
1562 }
1563 val::F64 => {
1564 let f = buf.get_f64();
1565 Ok(Value::Float(f))
1566 }
1567 val::STRING => {
1568 let s = decode_string(buf)?;
1569 Ok(Value::String(s))
1570 }
1571 val::BYTES => {
1572 if buf.remaining() < 2 {
1573 return Err(Error::BufferTooSmall {
1574 needed: 2,
1575 have: buf.remaining(),
1576 });
1577 }
1578 let len = buf.get_u16() as usize;
1579 if buf.remaining() < len {
1580 return Err(Error::BufferTooSmall {
1581 needed: len,
1582 have: buf.remaining(),
1583 });
1584 }
1585 let bytes = buf[..len].to_vec();
1586 buf.advance(len);
1587 Ok(Value::Bytes(bytes))
1588 }
1589 val::ARRAY => {
1590 let count = buf.get_u16() as usize;
1591 let mut arr = Vec::with_capacity(count);
1592 for _ in 0..count {
1593 let item_type = buf.get_u8();
1594 arr.push(decode_value_data(buf, item_type)?);
1595 }
1596 Ok(Value::Array(arr))
1597 }
1598 val::MAP => {
1599 let count = buf.get_u16() as usize;
1600 let mut map = HashMap::with_capacity(count);
1601 for _ in 0..count {
1602 let key = decode_string(buf)?;
1603 let val_type = buf.get_u8();
1604 let val = decode_value_data(buf, val_type)?;
1605 map.insert(key, val);
1606 }
1607 Ok(Value::Map(map))
1608 }
1609 _ => Err(Error::DecodeError(format!(
1610 "unknown value type: 0x{:02x}",
1611 vtype
1612 ))),
1613 }
1614}
1615
1616fn signal_type_from_code(code: u8) -> SignalType {
1617 match code {
1618 sig::PARAM => SignalType::Param,
1619 sig::EVENT => SignalType::Event,
1620 sig::STREAM => SignalType::Stream,
1621 sig::GESTURE => SignalType::Gesture,
1622 sig::TIMELINE => SignalType::Timeline,
1623 _ => SignalType::Event, }
1625}
1626
1627fn gesture_phase_from_code(code: u8) -> GesturePhase {
1628 match code {
1629 phase::START => GesturePhase::Start,
1630 phase::MOVE => GesturePhase::Move,
1631 phase::END => GesturePhase::End,
1632 phase::CANCEL => GesturePhase::Cancel,
1633 _ => GesturePhase::Start, }
1635}
1636
1637fn is_msgpack_map(byte: u8) -> bool {
1642 (byte & 0xF0) == 0x80 || byte == 0xDE || byte == 0xDF
1644}
1645
1646fn decode_v2_msgpack(bytes: &[u8]) -> Result<Message> {
1647 rmp_serde::from_slice(bytes).map_err(|e| Error::DecodeError(e.to_string()))
1648}
1649
1650#[cfg(test)]
1655mod tests {
1656 use super::*;
1657
1658 #[test]
1659 fn test_hello_roundtrip() {
1660 let msg = Message::Hello(HelloMessage {
1661 version: 1,
1662 name: "Test Client".to_string(),
1663 features: vec!["param".to_string(), "event".to_string()],
1664 capabilities: None,
1665 token: None,
1666 });
1667
1668 let encoded = encode(&msg).unwrap();
1669 let (decoded, frame) = decode(&encoded).unwrap();
1670
1671 match decoded {
1672 Message::Hello(hello) => {
1673 assert_eq!(hello.version, 1);
1674 assert_eq!(hello.name, "Test Client");
1675 assert!(hello.features.contains(&"param".to_string()));
1676 assert!(hello.features.contains(&"event".to_string()));
1677 }
1678 _ => panic!("Expected Hello message"),
1679 }
1680
1681 assert_eq!(frame.flags.qos, QoS::Fire);
1682 assert_eq!(frame.flags.version, 1); }
1684
1685 #[test]
1686 fn test_set_roundtrip() {
1687 let msg = Message::Set(SetMessage {
1688 address: "/test/value".to_string(),
1689 value: Value::Float(0.75),
1690 revision: Some(42),
1691 lock: false,
1692 unlock: false,
1693 ttl: None,
1694 });
1695
1696 let encoded = encode(&msg).unwrap();
1697 let (decoded, frame) = decode(&encoded).unwrap();
1698
1699 match decoded {
1700 Message::Set(set) => {
1701 assert_eq!(set.address, "/test/value");
1702 assert_eq!(set.value.as_f64(), Some(0.75));
1703 assert_eq!(set.revision, Some(42));
1704 }
1705 _ => panic!("Expected Set message"),
1706 }
1707
1708 assert_eq!(frame.flags.qos, QoS::Confirm);
1709 }
1710
1711 #[test]
1712 fn test_set_size_reduction() {
1713 let msg = Message::Set(SetMessage {
1714 address: "/test/value".to_string(),
1715 value: Value::Float(0.5),
1716 revision: Some(1),
1717 lock: false,
1718 unlock: false,
1719 ttl: None,
1720 });
1721
1722 let binary_payload = encode_message(&msg).unwrap();
1724
1725 let msgpack_payload = rmp_serde::to_vec_named(&msg).unwrap();
1727
1728 println!("Binary payload: {} bytes", binary_payload.len());
1729 println!("MessagePack payload: {} bytes", msgpack_payload.len());
1730
1731 assert!(
1733 binary_payload.len() < msgpack_payload.len(),
1734 "Binary encoding ({}) should be smaller than MessagePack ({})",
1735 binary_payload.len(),
1736 msgpack_payload.len()
1737 );
1738
1739 let savings = 100 - (binary_payload.len() * 100 / msgpack_payload.len());
1741 println!("Size reduction: {}%", savings);
1742 assert!(
1743 savings >= 40,
1744 "Expected at least 40% size reduction, got {}%",
1745 savings
1746 );
1747 }
1748
1749 #[test]
1750 fn test_bundle_roundtrip() {
1751 let msg = Message::Bundle(BundleMessage {
1752 timestamp: Some(1000000),
1753 messages: vec![
1754 Message::Set(SetMessage {
1755 address: "/light/1".to_string(),
1756 value: Value::Float(1.0),
1757 revision: None,
1758 lock: false,
1759 unlock: false,
1760 ttl: None,
1761 }),
1762 Message::Set(SetMessage {
1763 address: "/light/2".to_string(),
1764 value: Value::Float(0.0),
1765 revision: None,
1766 lock: false,
1767 unlock: false,
1768 ttl: None,
1769 }),
1770 ],
1771 });
1772
1773 let encoded = encode(&msg).unwrap();
1774 let (decoded, _) = decode(&encoded).unwrap();
1775
1776 match decoded {
1777 Message::Bundle(bundle) => {
1778 assert_eq!(bundle.timestamp, Some(1000000));
1779 assert_eq!(bundle.messages.len(), 2);
1780 }
1781 _ => panic!("Expected Bundle message"),
1782 }
1783 }
1784
1785 #[test]
1786 fn test_value_types() {
1787 let values = vec![
1788 Value::Null,
1789 Value::Bool(true),
1790 Value::Int(42),
1791 Value::Float(1.25),
1792 Value::String("hello".to_string()),
1793 Value::Array(vec![Value::Int(1), Value::Int(2), Value::Int(3)]),
1794 ];
1795
1796 for value in values {
1797 let msg = Message::Set(SetMessage {
1798 address: "/test".to_string(),
1799 value: value.clone(),
1800 revision: None,
1801 lock: false,
1802 unlock: false,
1803 ttl: None,
1804 });
1805
1806 let encoded = encode(&msg).unwrap();
1807 let (decoded, _) = decode(&encoded).unwrap();
1808
1809 match decoded {
1810 Message::Set(set) => {
1811 assert_eq!(set.value, value);
1812 }
1813 _ => panic!("Expected Set message"),
1814 }
1815 }
1816 }
1817
1818 #[test]
1819 fn test_backward_compat_v2_decode() {
1820 let msg = SetMessage {
1822 address: "/test/value".to_string(),
1823 value: Value::Float(0.5),
1824 revision: Some(1),
1825 lock: false,
1826 unlock: false,
1827 ttl: None,
1828 };
1829
1830 let v2_bytes = rmp_serde::to_vec_named(&Message::Set(msg.clone())).unwrap();
1832
1833 let decoded = decode_message(&v2_bytes).unwrap();
1835
1836 match decoded {
1837 Message::Set(set) => {
1838 assert_eq!(set.address, "/test/value");
1839 assert_eq!(set.value.as_f64(), Some(0.5));
1840 }
1841 _ => panic!("Expected Set message"),
1842 }
1843 }
1844
1845 #[test]
1846 fn test_ping_pong() {
1847 let ping = encode(&Message::Ping).unwrap();
1848 let (decoded, _) = decode(&ping).unwrap();
1849 assert!(matches!(decoded, Message::Ping));
1850
1851 let pong = encode(&Message::Pong).unwrap();
1852 let (decoded, _) = decode(&pong).unwrap();
1853 assert!(matches!(decoded, Message::Pong));
1854 }
1855
1856 #[test]
1857 fn test_publish_event() {
1858 let msg = Message::Publish(PublishMessage {
1859 address: "/cue/fire".to_string(),
1860 signal: Some(SignalType::Event),
1861 value: None,
1862 payload: Some(Value::String("intro".to_string())),
1863 samples: None,
1864 rate: None,
1865 id: None,
1866 phase: None,
1867 timestamp: Some(1234567890),
1868 timeline: None,
1869 });
1870
1871 let encoded = encode(&msg).unwrap();
1872 let (decoded, _) = decode(&encoded).unwrap();
1873
1874 match decoded {
1875 Message::Publish(pub_msg) => {
1876 assert_eq!(pub_msg.address, "/cue/fire");
1877 assert_eq!(pub_msg.signal, Some(SignalType::Event));
1878 assert_eq!(pub_msg.timestamp, Some(1234567890));
1879 }
1880 _ => panic!("Expected Publish message"),
1881 }
1882 }
1883
1884 #[test]
1885 fn test_subscribe_roundtrip() {
1886 let msg = Message::Subscribe(SubscribeMessage {
1887 id: 42,
1888 pattern: "/lumen/scene/*/layer/**".to_string(),
1889 types: vec![SignalType::Param, SignalType::Stream],
1890 options: Some(SubscribeOptions {
1891 max_rate: Some(60),
1892 epsilon: Some(0.01),
1893 history: None,
1894 window: None,
1895 }),
1896 });
1897
1898 let encoded = encode(&msg).unwrap();
1899 let (decoded, _) = decode(&encoded).unwrap();
1900
1901 match decoded {
1902 Message::Subscribe(sub) => {
1903 assert_eq!(sub.id, 42);
1904 assert_eq!(sub.pattern, "/lumen/scene/*/layer/**");
1905 assert!(sub.types.contains(&SignalType::Param));
1906 assert!(sub.types.contains(&SignalType::Stream));
1907 assert_eq!(sub.options.as_ref().unwrap().max_rate, Some(60));
1908 }
1909 _ => panic!("Expected Subscribe message"),
1910 }
1911 }
1912}