1use crate::types::*;
14use crate::{Error, Frame, QoS, Result};
15use bytes::{Buf, BufMut, Bytes, BytesMut};
16use std::collections::HashMap;
17
18pub const ENCODING_VERSION: u8 = 1;
20
21pub mod msg {
23 pub const HELLO: u8 = 0x01;
24 pub const WELCOME: u8 = 0x02;
25 pub const ANNOUNCE: u8 = 0x03;
26 pub const SUBSCRIBE: u8 = 0x10;
27 pub const UNSUBSCRIBE: u8 = 0x11;
28 pub const PUBLISH: u8 = 0x20;
29 pub const SET: u8 = 0x21;
30 pub const GET: u8 = 0x22;
31 pub const SNAPSHOT: u8 = 0x23;
32 pub const FEDERATION_SYNC: u8 = 0x04;
33 pub const REPLAY: u8 = 0x24;
34 pub const BUNDLE: u8 = 0x30;
35 pub const SYNC: u8 = 0x40;
36 pub const PING: u8 = 0x41;
37 pub const PONG: u8 = 0x42;
38 pub const ACK: u8 = 0x50;
39 pub const ERROR: u8 = 0x51;
40 pub const QUERY: u8 = 0x60;
41 pub const RESULT: u8 = 0x61;
42}
43
44pub mod val {
46 pub const NULL: u8 = 0x00;
47 pub const BOOL: u8 = 0x01;
48 pub const I8: u8 = 0x02;
49 pub const I16: u8 = 0x03;
50 pub const I32: u8 = 0x04;
51 pub const I64: u8 = 0x05;
52 pub const F32: u8 = 0x06;
53 pub const F64: u8 = 0x07;
54 pub const STRING: u8 = 0x08;
55 pub const BYTES: u8 = 0x09;
56 pub const ARRAY: u8 = 0x0A;
57 pub const MAP: u8 = 0x0B;
58}
59
60pub mod sig {
62 pub const PARAM: u8 = 0;
63 pub const EVENT: u8 = 1;
64 pub const STREAM: u8 = 2;
65 pub const GESTURE: u8 = 3;
66 pub const TIMELINE: u8 = 4;
67}
68
69pub mod phase {
71 pub const START: u8 = 0;
72 pub const MOVE: u8 = 1;
73 pub const END: u8 = 2;
74 pub const CANCEL: u8 = 3;
75}
76
77#[inline]
83pub fn encode_message(message: &Message) -> Result<Bytes> {
84 let capacity = estimate_message_size(message);
86 let mut buf = BytesMut::with_capacity(capacity);
87 encode_message_to_buf(&mut buf, message)?;
88 Ok(buf.freeze())
89}
90
91#[inline]
93fn estimate_message_size(msg: &Message) -> usize {
94 match msg {
95 Message::Set(m) => 2 + 2 + m.address.len() + 9 + if m.revision.is_some() { 8 } else { 0 } + if m.ttl.is_some() { 4 } else { 0 },
96 Message::Publish(m) => 2 + 2 + m.address.len() + 16,
97 Message::Hello(m) => 4 + m.name.len() + 2,
98 Message::Welcome(m) => 12 + m.name.len() + m.session.len() + 4,
99 Message::Subscribe(m) => 6 + m.pattern.len() + 16,
100 Message::Bundle(m) => 12 + m.messages.len() * 48,
101 Message::Replay(m) => 4 + m.pattern.len() + 20,
102 Message::FederationSync(m) => {
103 4 + m.patterns.iter().map(|p| 2 + p.len()).sum::<usize>() + m.revisions.len() * 12 + 16
104 }
105 Message::Ping | Message::Pong => 5, _ => 64, }
108}
109
110#[inline]
112pub fn decode_message(bytes: &[u8]) -> Result<Message> {
113 if bytes.is_empty() {
114 return Err(Error::BufferTooSmall { needed: 1, have: 0 });
115 }
116
117 let first = bytes[0];
118
119 if is_msgpack_map(first) {
122 decode_v2_msgpack(bytes)
124 } else {
125 decode_v3_binary(bytes)
127 }
128}
129
130#[inline]
132pub fn encode(message: &Message) -> Result<Bytes> {
133 let payload = encode_message(message)?;
134 let mut frame = Frame::new(payload).with_qos(message.default_qos());
135 frame.flags.version = 1; frame.encode()
137}
138
139pub fn encode_with_options(
141 message: &Message,
142 qos: Option<QoS>,
143 timestamp: Option<u64>,
144) -> Result<Bytes> {
145 let payload = encode_message(message)?;
146 let mut frame = Frame::new(payload);
147 frame.flags.version = 1; if let Some(qos) = qos {
150 frame = frame.with_qos(qos);
151 } else {
152 frame = frame.with_qos(message.default_qos());
153 }
154
155 if let Some(ts) = timestamp {
156 frame = frame.with_timestamp(ts);
157 }
158
159 frame.encode()
160}
161
162#[inline]
164pub fn decode(bytes: &[u8]) -> Result<(Message, Frame)> {
165 let frame = Frame::decode(bytes)?;
166 let message = decode_message(&frame.payload)?;
167 Ok((message, frame))
168}
169
170pub fn encode_payload(message: &Message) -> Result<Vec<u8>> {
172 let bytes = encode_message(message)?;
173 Ok(bytes.to_vec())
174}
175
176pub fn decode_payload(bytes: &[u8]) -> Result<Message> {
178 decode_message(bytes)
179}
180
181fn encode_message_to_buf(buf: &mut BytesMut, msg: &Message) -> Result<()> {
186 match msg {
187 Message::Hello(m) => encode_hello(buf, m),
188 Message::Welcome(m) => encode_welcome(buf, m),
189 Message::Announce(m) => encode_announce(buf, m),
190 Message::Subscribe(m) => encode_subscribe(buf, m),
191 Message::Unsubscribe(m) => encode_unsubscribe(buf, m),
192 Message::Publish(m) => encode_publish(buf, m),
193 Message::Set(m) => encode_set(buf, m),
194 Message::Get(m) => encode_get(buf, m),
195 Message::Snapshot(m) => encode_snapshot(buf, m),
196 Message::Replay(m) => encode_replay(buf, m),
197 Message::FederationSync(m) => encode_federation_sync(buf, m),
198 Message::Bundle(m) => encode_bundle(buf, m),
199 Message::Sync(m) => encode_sync(buf, m),
200 Message::Ping => {
201 buf.put_u8(msg::PING);
202 Ok(())
203 }
204 Message::Pong => {
205 buf.put_u8(msg::PONG);
206 Ok(())
207 }
208 Message::Ack(m) => encode_ack(buf, m),
209 Message::Error(m) => encode_error(buf, m),
210 Message::Query(m) => encode_query(buf, m),
211 Message::Result(m) => encode_result(buf, m),
212 }
213}
214
215#[inline]
218fn encode_set(buf: &mut BytesMut, msg: &SetMessage) -> Result<()> {
219 buf.put_u8(msg::SET);
220
221 let vtype = value_type_code(&msg.value);
222 let mut flags = vtype & 0x0F;
223 if msg.revision.is_some() {
224 flags |= 0x80;
225 }
226 if msg.lock {
227 flags |= 0x40;
228 }
229 if msg.unlock {
230 flags |= 0x20;
231 }
232 if msg.ttl.is_some() {
233 flags |= 0x10;
234 }
235 buf.put_u8(flags);
236
237 encode_string(buf, &msg.address)?;
239
240 encode_value_data(buf, &msg.value)?;
242
243 if let Some(rev) = msg.revision {
245 buf.put_u64(rev);
246 }
247
248 if let Some(ttl) = msg.ttl {
250 let raw = match ttl {
251 Ttl::Never => 0u32,
252 Ttl::Sliding(secs) => secs & 0x7FFF_FFFF,
253 Ttl::Absolute(secs) => (secs & 0x7FFF_FFFF) | 0x8000_0000,
254 };
255 buf.put_u32(raw);
256 }
257
258 Ok(())
259}
260
261fn encode_publish(buf: &mut BytesMut, msg: &PublishMessage) -> Result<()> {
264 buf.put_u8(msg::PUBLISH);
265
266 let sig_code = msg.signal.map(signal_type_code).unwrap_or(sig::EVENT);
267 let phase_code = msg.phase.map(gesture_phase_code).unwrap_or(phase::START);
268
269 let mut flags: u8 = (sig_code & 0x07) << 5;
270 if msg.timestamp.is_some() {
271 flags |= 0x10;
272 }
273 if msg.id.is_some() {
274 flags |= 0x08;
275 }
276 flags |= phase_code & 0x07;
277 buf.put_u8(flags);
278
279 encode_string(buf, &msg.address)?;
281
282 if let Some(ref value) = msg.value {
284 buf.put_u8(1); buf.put_u8(value_type_code(value));
286 encode_value_data(buf, value)?;
287 } else if let Some(ref payload) = msg.payload {
288 buf.put_u8(1); buf.put_u8(value_type_code(payload));
290 encode_value_data(buf, payload)?;
291 } else if let Some(ref samples) = msg.samples {
292 buf.put_u8(2); buf.put_u16(samples.len() as u16);
294 for sample in samples {
295 buf.put_f64(*sample);
296 }
297 } else {
298 buf.put_u8(0); }
300
301 if let Some(ts) = msg.timestamp {
303 buf.put_u64(ts);
304 }
305
306 if let Some(id) = msg.id {
308 buf.put_u32(id);
309 }
310
311 if let Some(rate) = msg.rate {
313 buf.put_u32(rate);
314 }
315
316 Ok(())
317}
318
319fn encode_hello(buf: &mut BytesMut, msg: &HelloMessage) -> Result<()> {
321 buf.put_u8(msg::HELLO);
322 buf.put_u8(msg.version);
323
324 let mut features: u8 = 0;
326 for f in &msg.features {
327 match f.as_str() {
328 "param" => features |= 0x80,
329 "event" => features |= 0x40,
330 "stream" => features |= 0x20,
331 "gesture" => features |= 0x10,
332 "timeline" => features |= 0x08,
333 "federation" => features |= 0x04,
334 _ => {}
335 }
336 }
337 buf.put_u8(features);
338
339 encode_string(buf, &msg.name)?;
341
342 if let Some(ref token) = msg.token {
344 encode_string(buf, token)?;
345 } else {
346 buf.put_u16(0);
347 }
348
349 Ok(())
350}
351
352fn encode_welcome(buf: &mut BytesMut, msg: &WelcomeMessage) -> Result<()> {
354 buf.put_u8(msg::WELCOME);
355 buf.put_u8(msg.version);
356
357 let mut features: u8 = 0;
359 for f in &msg.features {
360 match f.as_str() {
361 "param" => features |= 0x80,
362 "event" => features |= 0x40,
363 "stream" => features |= 0x20,
364 "gesture" => features |= 0x10,
365 "timeline" => features |= 0x08,
366 _ => {}
367 }
368 }
369 buf.put_u8(features);
370
371 buf.put_u64(msg.time);
373
374 encode_string(buf, &msg.session)?;
376
377 encode_string(buf, &msg.name)?;
379
380 if let Some(ref token) = msg.token {
382 encode_string(buf, token)?;
383 } else {
384 buf.put_u16(0);
385 }
386
387 Ok(())
388}
389
390fn encode_announce(buf: &mut BytesMut, msg: &AnnounceMessage) -> Result<()> {
392 buf.put_u8(msg::ANNOUNCE);
393
394 encode_string(buf, &msg.namespace)?;
395 buf.put_u16(msg.signals.len() as u16);
396
397 for sig in &msg.signals {
398 encode_string(buf, &sig.address)?;
399 buf.put_u8(signal_type_code(sig.signal_type));
400
401 let mut opt_flags: u8 = 0;
403 if sig.datatype.is_some() {
404 opt_flags |= 0x01;
405 }
406 if sig.access.is_some() {
407 opt_flags |= 0x02;
408 }
409 if sig.meta.is_some() {
410 opt_flags |= 0x04;
411 }
412 buf.put_u8(opt_flags);
413
414 if let Some(ref dt) = sig.datatype {
415 encode_string(buf, dt)?;
416 }
417 if let Some(ref access) = sig.access {
418 encode_string(buf, access)?;
419 }
420 if let Some(ref meta) = sig.meta {
421 let mut meta_flags: u8 = 0;
423 if meta.unit.is_some() {
424 meta_flags |= 0x01;
425 }
426 if meta.range.is_some() {
427 meta_flags |= 0x02;
428 }
429 if meta.default.is_some() {
430 meta_flags |= 0x04;
431 }
432 if meta.description.is_some() {
433 meta_flags |= 0x08;
434 }
435 buf.put_u8(meta_flags);
436
437 if let Some(ref unit) = meta.unit {
438 encode_string(buf, unit)?;
439 }
440 if let Some((min, max)) = meta.range {
441 buf.put_f64(min);
442 buf.put_f64(max);
443 }
444 if let Some(ref default) = meta.default {
445 buf.put_u8(value_type_code(default));
446 encode_value_data(buf, default)?;
447 }
448 if let Some(ref desc) = meta.description {
449 encode_string(buf, desc)?;
450 }
451 }
452 }
453
454 Ok(())
455}
456
457fn encode_subscribe(buf: &mut BytesMut, msg: &SubscribeMessage) -> Result<()> {
459 buf.put_u8(msg::SUBSCRIBE);
460 buf.put_u32(msg.id);
461
462 encode_string(buf, &msg.pattern)?;
463
464 let mut type_mask: u8 = 0;
466 if msg.types.is_empty() {
467 type_mask = 0xFF; } else {
469 for t in &msg.types {
470 match t {
471 SignalType::Param => type_mask |= 0x01,
472 SignalType::Event => type_mask |= 0x02,
473 SignalType::Stream => type_mask |= 0x04,
474 SignalType::Gesture => type_mask |= 0x08,
475 SignalType::Timeline => type_mask |= 0x10,
476 }
477 }
478 }
479 buf.put_u8(type_mask);
480
481 if let Some(ref opts) = msg.options {
483 let mut opt_flags: u8 = 0;
484 if opts.max_rate.is_some() {
485 opt_flags |= 0x01;
486 }
487 if opts.epsilon.is_some() {
488 opt_flags |= 0x02;
489 }
490 if opts.history.is_some() {
491 opt_flags |= 0x04;
492 }
493 if opts.window.is_some() {
494 opt_flags |= 0x08;
495 }
496 buf.put_u8(opt_flags);
497
498 if let Some(rate) = opts.max_rate {
499 buf.put_u32(rate);
500 }
501 if let Some(eps) = opts.epsilon {
502 buf.put_f64(eps);
503 }
504 if let Some(hist) = opts.history {
505 buf.put_u32(hist);
506 }
507 if let Some(win) = opts.window {
508 buf.put_u32(win);
509 }
510 } else {
511 buf.put_u8(0); }
513
514 Ok(())
515}
516
517fn encode_unsubscribe(buf: &mut BytesMut, msg: &UnsubscribeMessage) -> Result<()> {
519 buf.put_u8(msg::UNSUBSCRIBE);
520 buf.put_u32(msg.id);
521 Ok(())
522}
523
524fn encode_get(buf: &mut BytesMut, msg: &GetMessage) -> Result<()> {
526 buf.put_u8(msg::GET);
527 encode_string(buf, &msg.address)?;
528 Ok(())
529}
530
531fn encode_snapshot(buf: &mut BytesMut, msg: &SnapshotMessage) -> Result<()> {
533 buf.put_u8(msg::SNAPSHOT);
534 buf.put_u16(msg.params.len() as u16);
535
536 for param in &msg.params {
537 encode_string(buf, ¶m.address)?;
538 buf.put_u8(value_type_code(¶m.value));
539 encode_value_data(buf, ¶m.value)?;
540 buf.put_u64(param.revision);
541
542 let mut opt_flags: u8 = 0;
543 if param.writer.is_some() {
544 opt_flags |= 0x01;
545 }
546 if param.timestamp.is_some() {
547 opt_flags |= 0x02;
548 }
549 buf.put_u8(opt_flags);
550
551 if let Some(ref writer) = param.writer {
552 encode_string(buf, writer)?;
553 }
554 if let Some(ts) = param.timestamp {
555 buf.put_u64(ts);
556 }
557 }
558
559 Ok(())
560}
561
562fn encode_replay(buf: &mut BytesMut, msg: &ReplayMessage) -> Result<()> {
565 buf.put_u8(msg::REPLAY);
566
567 let mut flags: u8 = 0;
568 if msg.from.is_some() {
569 flags |= 0x80;
570 }
571 if msg.to.is_some() {
572 flags |= 0x40;
573 }
574 if msg.limit.is_some() {
575 flags |= 0x20;
576 }
577 buf.put_u8(flags);
578
579 encode_string(buf, &msg.pattern)?;
580
581 if let Some(from) = msg.from {
582 buf.put_u64(from);
583 }
584 if let Some(to) = msg.to {
585 buf.put_u64(to);
586 }
587 if let Some(limit) = msg.limit {
588 buf.put_u32(limit);
589 }
590
591 let mut type_mask: u8 = 0;
593 if msg.types.is_empty() {
594 type_mask = 0xFF; } else {
596 for t in &msg.types {
597 match t {
598 SignalType::Param => type_mask |= 0x01,
599 SignalType::Event => type_mask |= 0x02,
600 SignalType::Stream => type_mask |= 0x04,
601 SignalType::Gesture => type_mask |= 0x08,
602 SignalType::Timeline => type_mask |= 0x10,
603 }
604 }
605 }
606 buf.put_u8(type_mask);
607
608 Ok(())
609}
610
611fn encode_federation_sync(buf: &mut BytesMut, msg: &FederationSyncMessage) -> Result<()> {
614 buf.put_u8(msg::FEDERATION_SYNC);
615 buf.put_u8(msg.op as u8);
616
617 buf.put_u16(msg.patterns.len() as u16);
619 for pattern in &msg.patterns {
620 encode_string(buf, pattern)?;
621 }
622
623 buf.put_u16(msg.revisions.len() as u16);
625 for (key, val) in &msg.revisions {
626 encode_string(buf, key)?;
627 buf.put_u64(*val);
628 }
629
630 let mut flags: u8 = 0;
632 if msg.since_revision.is_some() {
633 flags |= 0x80;
634 }
635 if msg.origin.is_some() {
636 flags |= 0x40;
637 }
638 buf.put_u8(flags);
639
640 if let Some(since) = msg.since_revision {
641 buf.put_u64(since);
642 }
643 if let Some(ref origin) = msg.origin {
644 encode_string(buf, origin)?;
645 }
646
647 Ok(())
648}
649
650fn encode_bundle(buf: &mut BytesMut, msg: &BundleMessage) -> Result<()> {
652 buf.put_u8(msg::BUNDLE);
653
654 let mut flags: u8 = 0;
655 if msg.timestamp.is_some() {
656 flags |= 0x80;
657 }
658 buf.put_u8(flags);
659
660 buf.put_u16(msg.messages.len() as u16);
661
662 if let Some(ts) = msg.timestamp {
663 buf.put_u64(ts);
664 }
665
666 for inner_msg in &msg.messages {
668 let mut inner_buf = BytesMut::with_capacity(64);
669 encode_message_to_buf(&mut inner_buf, inner_msg)?;
670 buf.put_u16(inner_buf.len() as u16);
671 buf.extend_from_slice(&inner_buf);
672 }
673
674 Ok(())
675}
676
677fn encode_sync(buf: &mut BytesMut, msg: &SyncMessage) -> Result<()> {
679 buf.put_u8(msg::SYNC);
680
681 let mut flags: u8 = 0;
682 if msg.t2.is_some() {
683 flags |= 0x01;
684 }
685 if msg.t3.is_some() {
686 flags |= 0x02;
687 }
688 buf.put_u8(flags);
689
690 buf.put_u64(msg.t1);
691 if let Some(t2) = msg.t2 {
692 buf.put_u64(t2);
693 }
694 if let Some(t3) = msg.t3 {
695 buf.put_u64(t3);
696 }
697
698 Ok(())
699}
700
701fn encode_ack(buf: &mut BytesMut, msg: &AckMessage) -> Result<()> {
703 buf.put_u8(msg::ACK);
704
705 let mut flags: u8 = 0;
706 if msg.address.is_some() {
707 flags |= 0x01;
708 }
709 if msg.revision.is_some() {
710 flags |= 0x02;
711 }
712 if msg.locked.is_some() {
713 flags |= 0x04;
714 }
715 if msg.holder.is_some() {
716 flags |= 0x08;
717 }
718 if msg.correlation_id.is_some() {
719 flags |= 0x10;
720 }
721 buf.put_u8(flags);
722
723 if let Some(ref addr) = msg.address {
724 encode_string(buf, addr)?;
725 }
726 if let Some(rev) = msg.revision {
727 buf.put_u64(rev);
728 }
729 if let Some(locked) = msg.locked {
730 buf.put_u8(if locked { 1 } else { 0 });
731 }
732 if let Some(ref holder) = msg.holder {
733 encode_string(buf, holder)?;
734 }
735 if let Some(corr) = msg.correlation_id {
736 buf.put_u32(corr);
737 }
738
739 Ok(())
740}
741
742fn encode_error(buf: &mut BytesMut, msg: &ErrorMessage) -> Result<()> {
744 buf.put_u8(msg::ERROR);
745 buf.put_u16(msg.code);
746 encode_string(buf, &msg.message)?;
747
748 let mut flags: u8 = 0;
749 if msg.address.is_some() {
750 flags |= 0x01;
751 }
752 if msg.correlation_id.is_some() {
753 flags |= 0x02;
754 }
755 buf.put_u8(flags);
756
757 if let Some(ref addr) = msg.address {
758 encode_string(buf, addr)?;
759 }
760 if let Some(corr) = msg.correlation_id {
761 buf.put_u32(corr);
762 }
763
764 Ok(())
765}
766
767fn encode_query(buf: &mut BytesMut, msg: &QueryMessage) -> Result<()> {
769 buf.put_u8(msg::QUERY);
770 encode_string(buf, &msg.pattern)?;
771 Ok(())
772}
773
774fn encode_result(buf: &mut BytesMut, msg: &ResultMessage) -> Result<()> {
776 buf.put_u8(msg::RESULT);
777 buf.put_u16(msg.signals.len() as u16);
778
779 for sig in &msg.signals {
780 encode_string(buf, &sig.address)?;
781 buf.put_u8(signal_type_code(sig.signal_type));
782
783 let mut opt_flags: u8 = 0;
784 if sig.datatype.is_some() {
785 opt_flags |= 0x01;
786 }
787 if sig.access.is_some() {
788 opt_flags |= 0x02;
789 }
790 buf.put_u8(opt_flags);
791
792 if let Some(ref dt) = sig.datatype {
793 encode_string(buf, dt)?;
794 }
795 if let Some(ref access) = sig.access {
796 encode_string(buf, access)?;
797 }
798 }
799
800 Ok(())
801}
802
803#[inline(always)]
808fn encode_string(buf: &mut BytesMut, s: &str) -> Result<()> {
809 let bytes = s.as_bytes();
810 if bytes.len() > u16::MAX as usize {
811 return Err(Error::PayloadTooLarge(bytes.len()));
812 }
813 buf.put_u16(bytes.len() as u16);
814 buf.extend_from_slice(bytes);
815 Ok(())
816}
817
818#[inline]
819fn encode_value_data(buf: &mut BytesMut, value: &Value) -> Result<()> {
820 match value {
821 Value::Null => {} Value::Bool(b) => buf.put_u8(if *b { 1 } else { 0 }),
823 Value::Int(i) => buf.put_i64(*i),
824 Value::Float(f) => buf.put_f64(*f),
825 Value::String(s) => encode_string(buf, s)?,
826 Value::Bytes(b) => {
827 buf.put_u16(b.len() as u16);
828 buf.extend_from_slice(b);
829 }
830 Value::Array(arr) => {
831 buf.put_u16(arr.len() as u16);
832 for item in arr {
833 buf.put_u8(value_type_code(item));
834 encode_value_data(buf, item)?;
835 }
836 }
837 Value::Map(map) => {
838 buf.put_u16(map.len() as u16);
839 for (key, val) in map {
840 encode_string(buf, key)?;
841 buf.put_u8(value_type_code(val));
842 encode_value_data(buf, val)?;
843 }
844 }
845 }
846 Ok(())
847}
848
849#[inline(always)]
850fn value_type_code(value: &Value) -> u8 {
851 match value {
852 Value::Null => val::NULL,
853 Value::Bool(_) => val::BOOL,
854 Value::Int(_) => val::I64,
855 Value::Float(_) => val::F64,
856 Value::String(_) => val::STRING,
857 Value::Bytes(_) => val::BYTES,
858 Value::Array(_) => val::ARRAY,
859 Value::Map(_) => val::MAP,
860 }
861}
862
863fn signal_type_code(sig: SignalType) -> u8 {
864 match sig {
865 SignalType::Param => sig::PARAM,
866 SignalType::Event => sig::EVENT,
867 SignalType::Stream => sig::STREAM,
868 SignalType::Gesture => sig::GESTURE,
869 SignalType::Timeline => sig::TIMELINE,
870 }
871}
872
873fn gesture_phase_code(phase: GesturePhase) -> u8 {
874 match phase {
875 GesturePhase::Start => phase::START,
876 GesturePhase::Move => phase::MOVE,
877 GesturePhase::End => phase::END,
878 GesturePhase::Cancel => phase::CANCEL,
879 }
880}
881
882fn decode_v3_binary(bytes: &[u8]) -> Result<Message> {
887 if bytes.is_empty() {
888 return Err(Error::BufferTooSmall { needed: 1, have: 0 });
889 }
890
891 let mut buf = bytes;
892 let msg_type = buf.get_u8();
893
894 match msg_type {
895 msg::HELLO => decode_hello(&mut buf),
896 msg::WELCOME => decode_welcome(&mut buf),
897 msg::ANNOUNCE => decode_announce(&mut buf),
898 msg::SUBSCRIBE => decode_subscribe(&mut buf),
899 msg::UNSUBSCRIBE => decode_unsubscribe(&mut buf),
900 msg::PUBLISH => decode_publish(&mut buf),
901 msg::SET => decode_set(&mut buf),
902 msg::GET => decode_get(&mut buf),
903 msg::SNAPSHOT => decode_snapshot(&mut buf),
904 msg::REPLAY => decode_replay(&mut buf),
905 msg::FEDERATION_SYNC => decode_federation_sync(&mut buf),
906 msg::BUNDLE => decode_bundle(&mut buf),
907 msg::SYNC => decode_sync(&mut buf),
908 msg::PING => Ok(Message::Ping),
909 msg::PONG => Ok(Message::Pong),
910 msg::ACK => decode_ack(&mut buf),
911 msg::ERROR => decode_error(&mut buf),
912 msg::QUERY => decode_query(&mut buf),
913 msg::RESULT => decode_result(&mut buf),
914 _ => Err(Error::UnknownMessageType(msg_type)),
915 }
916}
917
918#[inline]
919fn decode_set(buf: &mut &[u8]) -> Result<Message> {
920 let flags = buf.get_u8();
921 let vtype = flags & 0x0F;
922 let has_rev = (flags & 0x80) != 0;
923 let lock = (flags & 0x40) != 0;
924 let unlock = (flags & 0x20) != 0;
925 let has_ttl = (flags & 0x10) != 0;
926
927 let address = decode_string(buf)?;
928 let value = decode_value_data(buf, vtype)?;
929
930 let revision = if has_rev { Some(buf.get_u64()) } else { None };
931
932 let ttl = if has_ttl {
933 let raw = buf.get_u32();
934 if raw == 0 {
935 Some(Ttl::Never)
936 } else if raw & 0x8000_0000 != 0 {
937 Some(Ttl::Absolute(raw & 0x7FFF_FFFF))
938 } else {
939 Some(Ttl::Sliding(raw))
940 }
941 } else {
942 None
943 };
944
945 Ok(Message::Set(SetMessage {
946 address,
947 value,
948 revision,
949 lock,
950 unlock,
951 ttl,
952 }))
953}
954
955fn decode_publish(buf: &mut &[u8]) -> Result<Message> {
956 let flags = buf.get_u8();
957 let sig_code = (flags >> 5) & 0x07;
958 let has_ts = (flags & 0x10) != 0;
959 let has_id = (flags & 0x08) != 0;
960 let phase_code = flags & 0x07;
961
962 let address = decode_string(buf)?;
963
964 let value_indicator = buf.get_u8();
966 let (value, payload, samples) = match value_indicator {
967 0 => (None, None, None),
968 1 => {
969 let vtype = buf.get_u8();
970 let v = decode_value_data(buf, vtype)?;
971 (Some(v), None, None)
972 }
973 2 => {
974 let count = buf.get_u16() as usize;
975 let mut s = Vec::with_capacity(count);
976 for _ in 0..count {
977 s.push(buf.get_f64());
978 }
979 (None, None, Some(s))
980 }
981 _ => (None, None, None),
982 };
983
984 let timestamp = if has_ts { Some(buf.get_u64()) } else { None };
985 let id = if has_id { Some(buf.get_u32()) } else { None };
986
987 let rate = if buf.remaining() >= 4 {
989 Some(buf.get_u32())
990 } else {
991 None
992 };
993
994 let signal = Some(signal_type_from_code(sig_code));
995 let phase = Some(gesture_phase_from_code(phase_code));
996
997 Ok(Message::Publish(PublishMessage {
998 address,
999 signal,
1000 value,
1001 payload,
1002 samples,
1003 rate,
1004 id,
1005 phase,
1006 timestamp,
1007 timeline: None, }))
1009}
1010
1011fn decode_hello(buf: &mut &[u8]) -> Result<Message> {
1012 let version = buf.get_u8();
1013 let feature_flags = buf.get_u8();
1014
1015 let mut features = Vec::new();
1016 if feature_flags & 0x80 != 0 {
1017 features.push("param".to_string());
1018 }
1019 if feature_flags & 0x40 != 0 {
1020 features.push("event".to_string());
1021 }
1022 if feature_flags & 0x20 != 0 {
1023 features.push("stream".to_string());
1024 }
1025 if feature_flags & 0x10 != 0 {
1026 features.push("gesture".to_string());
1027 }
1028 if feature_flags & 0x08 != 0 {
1029 features.push("timeline".to_string());
1030 }
1031 if feature_flags & 0x04 != 0 {
1032 features.push("federation".to_string());
1033 }
1034
1035 let name = decode_string(buf)?;
1036 let token_str = decode_string(buf)?;
1037 let token = if token_str.is_empty() {
1038 None
1039 } else {
1040 Some(token_str)
1041 };
1042
1043 Ok(Message::Hello(HelloMessage {
1044 version,
1045 name,
1046 features,
1047 capabilities: None,
1048 token,
1049 }))
1050}
1051
1052fn decode_welcome(buf: &mut &[u8]) -> Result<Message> {
1053 let version = buf.get_u8();
1054 let feature_flags = buf.get_u8();
1055
1056 let mut features = Vec::new();
1057 if feature_flags & 0x80 != 0 {
1058 features.push("param".to_string());
1059 }
1060 if feature_flags & 0x40 != 0 {
1061 features.push("event".to_string());
1062 }
1063 if feature_flags & 0x20 != 0 {
1064 features.push("stream".to_string());
1065 }
1066 if feature_flags & 0x10 != 0 {
1067 features.push("gesture".to_string());
1068 }
1069 if feature_flags & 0x08 != 0 {
1070 features.push("timeline".to_string());
1071 }
1072
1073 let time = buf.get_u64();
1074 let session = decode_string(buf)?;
1075 let name = decode_string(buf)?;
1076
1077 let token_str = decode_string(buf)?;
1078 let token = if token_str.is_empty() {
1079 None
1080 } else {
1081 Some(token_str)
1082 };
1083
1084 Ok(Message::Welcome(WelcomeMessage {
1085 version,
1086 session,
1087 name,
1088 features,
1089 time,
1090 token,
1091 }))
1092}
1093
1094fn decode_announce(buf: &mut &[u8]) -> Result<Message> {
1095 let namespace = decode_string(buf)?;
1096 let count = buf.get_u16() as usize;
1097
1098 let mut signals = Vec::with_capacity(count);
1099 for _ in 0..count {
1100 let address = decode_string(buf)?;
1101 let sig_code = buf.get_u8();
1102 let opt_flags = buf.get_u8();
1103
1104 let datatype = if opt_flags & 0x01 != 0 {
1105 Some(decode_string(buf)?)
1106 } else {
1107 None
1108 };
1109 let access = if opt_flags & 0x02 != 0 {
1110 Some(decode_string(buf)?)
1111 } else {
1112 None
1113 };
1114
1115 let meta = if opt_flags & 0x04 != 0 {
1116 let meta_flags = buf.get_u8();
1117
1118 let unit = if meta_flags & 0x01 != 0 {
1119 Some(decode_string(buf)?)
1120 } else {
1121 None
1122 };
1123 let range = if meta_flags & 0x02 != 0 {
1124 let min = buf.get_f64();
1125 let max = buf.get_f64();
1126 Some((min, max))
1127 } else {
1128 None
1129 };
1130 let default = if meta_flags & 0x04 != 0 {
1131 let vtype = buf.get_u8();
1132 Some(decode_value_data(buf, vtype)?)
1133 } else {
1134 None
1135 };
1136 let description = if meta_flags & 0x08 != 0 {
1137 Some(decode_string(buf)?)
1138 } else {
1139 None
1140 };
1141
1142 Some(SignalMeta {
1143 unit,
1144 range,
1145 default,
1146 description,
1147 })
1148 } else {
1149 None
1150 };
1151
1152 signals.push(SignalDefinition {
1153 address,
1154 signal_type: signal_type_from_code(sig_code),
1155 datatype,
1156 access,
1157 meta,
1158 });
1159 }
1160
1161 Ok(Message::Announce(AnnounceMessage {
1162 namespace,
1163 signals,
1164 meta: None,
1165 }))
1166}
1167
1168fn decode_subscribe(buf: &mut &[u8]) -> Result<Message> {
1169 let id = buf.get_u32();
1170 let pattern = decode_string(buf)?;
1171 let type_mask = buf.get_u8();
1172
1173 let mut types = Vec::new();
1174 if type_mask == 0xFF {
1175 } else {
1177 if type_mask & 0x01 != 0 {
1178 types.push(SignalType::Param);
1179 }
1180 if type_mask & 0x02 != 0 {
1181 types.push(SignalType::Event);
1182 }
1183 if type_mask & 0x04 != 0 {
1184 types.push(SignalType::Stream);
1185 }
1186 if type_mask & 0x08 != 0 {
1187 types.push(SignalType::Gesture);
1188 }
1189 if type_mask & 0x10 != 0 {
1190 types.push(SignalType::Timeline);
1191 }
1192 }
1193
1194 let opt_flags = buf.get_u8();
1195 let options = if opt_flags != 0 {
1196 let max_rate = if opt_flags & 0x01 != 0 {
1197 Some(buf.get_u32())
1198 } else {
1199 None
1200 };
1201 let epsilon = if opt_flags & 0x02 != 0 {
1202 Some(buf.get_f64())
1203 } else {
1204 None
1205 };
1206 let history = if opt_flags & 0x04 != 0 {
1207 Some(buf.get_u32())
1208 } else {
1209 None
1210 };
1211 let window = if opt_flags & 0x08 != 0 {
1212 Some(buf.get_u32())
1213 } else {
1214 None
1215 };
1216
1217 Some(SubscribeOptions {
1218 max_rate,
1219 epsilon,
1220 history,
1221 window,
1222 })
1223 } else {
1224 None
1225 };
1226
1227 Ok(Message::Subscribe(SubscribeMessage {
1228 id,
1229 pattern,
1230 types,
1231 options,
1232 }))
1233}
1234
1235fn decode_unsubscribe(buf: &mut &[u8]) -> Result<Message> {
1236 let id = buf.get_u32();
1237 Ok(Message::Unsubscribe(UnsubscribeMessage { id }))
1238}
1239
1240fn decode_get(buf: &mut &[u8]) -> Result<Message> {
1241 let address = decode_string(buf)?;
1242 Ok(Message::Get(GetMessage { address }))
1243}
1244
1245fn decode_snapshot(buf: &mut &[u8]) -> Result<Message> {
1246 let count = buf.get_u16() as usize;
1247 let mut params = Vec::with_capacity(count);
1248
1249 for _ in 0..count {
1250 let address = decode_string(buf)?;
1251 let vtype = buf.get_u8();
1252 let value = decode_value_data(buf, vtype)?;
1253 let revision = buf.get_u64();
1254 let opt_flags = buf.get_u8();
1255
1256 let writer = if opt_flags & 0x01 != 0 {
1257 Some(decode_string(buf)?)
1258 } else {
1259 None
1260 };
1261 let timestamp = if opt_flags & 0x02 != 0 {
1262 Some(buf.get_u64())
1263 } else {
1264 None
1265 };
1266
1267 params.push(ParamValue {
1268 address,
1269 value,
1270 revision,
1271 writer,
1272 timestamp,
1273 });
1274 }
1275
1276 Ok(Message::Snapshot(SnapshotMessage { params }))
1277}
1278
1279fn decode_replay(buf: &mut &[u8]) -> Result<Message> {
1280 let flags = buf.get_u8();
1281 let has_from = (flags & 0x80) != 0;
1282 let has_to = (flags & 0x40) != 0;
1283 let has_limit = (flags & 0x20) != 0;
1284
1285 let pattern = decode_string(buf)?;
1286
1287 let from = if has_from { Some(buf.get_u64()) } else { None };
1288 let to = if has_to { Some(buf.get_u64()) } else { None };
1289 let limit = if has_limit { Some(buf.get_u32()) } else { None };
1290
1291 let type_mask = buf.get_u8();
1292 let mut types = Vec::new();
1293 if type_mask != 0xFF {
1294 if type_mask & 0x01 != 0 {
1295 types.push(SignalType::Param);
1296 }
1297 if type_mask & 0x02 != 0 {
1298 types.push(SignalType::Event);
1299 }
1300 if type_mask & 0x04 != 0 {
1301 types.push(SignalType::Stream);
1302 }
1303 if type_mask & 0x08 != 0 {
1304 types.push(SignalType::Gesture);
1305 }
1306 if type_mask & 0x10 != 0 {
1307 types.push(SignalType::Timeline);
1308 }
1309 }
1310
1311 Ok(Message::Replay(ReplayMessage {
1312 pattern,
1313 from,
1314 to,
1315 limit,
1316 types,
1317 }))
1318}
1319
1320fn decode_federation_sync(buf: &mut &[u8]) -> Result<Message> {
1321 let op_byte = buf.get_u8();
1322 let op = match op_byte {
1323 0x01 => FederationOp::DeclareNamespaces,
1324 0x02 => FederationOp::RequestSync,
1325 0x03 => FederationOp::RevisionVector,
1326 0x04 => FederationOp::SyncComplete,
1327 _ => return Err(Error::DecodeError("unknown federation op".into())),
1328 };
1329
1330 let pattern_count = buf.get_u16() as usize;
1332 let mut patterns = Vec::with_capacity(pattern_count);
1333 for _ in 0..pattern_count {
1334 patterns.push(decode_string(buf)?);
1335 }
1336
1337 let revision_count = buf.get_u16() as usize;
1339 let mut revisions = std::collections::HashMap::with_capacity(revision_count);
1340 for _ in 0..revision_count {
1341 let key = decode_string(buf)?;
1342 let val = buf.get_u64();
1343 revisions.insert(key, val);
1344 }
1345
1346 let flags = buf.get_u8();
1348 let since_revision = if flags & 0x80 != 0 {
1349 Some(buf.get_u64())
1350 } else {
1351 None
1352 };
1353 let origin = if flags & 0x40 != 0 {
1354 Some(decode_string(buf)?)
1355 } else {
1356 None
1357 };
1358
1359 Ok(Message::FederationSync(FederationSyncMessage {
1360 op,
1361 patterns,
1362 revisions,
1363 since_revision,
1364 origin,
1365 }))
1366}
1367
1368fn decode_bundle(buf: &mut &[u8]) -> Result<Message> {
1369 let flags = buf.get_u8();
1370 let has_ts = (flags & 0x80) != 0;
1371 let count = buf.get_u16() as usize;
1372
1373 let timestamp = if has_ts { Some(buf.get_u64()) } else { None };
1374
1375 let mut messages = Vec::with_capacity(count);
1376 for _ in 0..count {
1377 let len = buf.get_u16() as usize;
1378 let inner_bytes = &buf[..len];
1379 buf.advance(len);
1380 messages.push(decode_v3_binary(inner_bytes)?);
1381 }
1382
1383 Ok(Message::Bundle(BundleMessage {
1384 timestamp,
1385 messages,
1386 }))
1387}
1388
1389fn decode_sync(buf: &mut &[u8]) -> Result<Message> {
1390 let flags = buf.get_u8();
1391 let t1 = buf.get_u64();
1392 let t2 = if flags & 0x01 != 0 {
1393 Some(buf.get_u64())
1394 } else {
1395 None
1396 };
1397 let t3 = if flags & 0x02 != 0 {
1398 Some(buf.get_u64())
1399 } else {
1400 None
1401 };
1402
1403 Ok(Message::Sync(SyncMessage { t1, t2, t3 }))
1404}
1405
1406fn decode_ack(buf: &mut &[u8]) -> Result<Message> {
1407 let flags = buf.get_u8();
1408
1409 let address = if flags & 0x01 != 0 {
1410 Some(decode_string(buf)?)
1411 } else {
1412 None
1413 };
1414 let revision = if flags & 0x02 != 0 {
1415 Some(buf.get_u64())
1416 } else {
1417 None
1418 };
1419 let locked = if flags & 0x04 != 0 {
1420 Some(buf.get_u8() != 0)
1421 } else {
1422 None
1423 };
1424 let holder = if flags & 0x08 != 0 {
1425 Some(decode_string(buf)?)
1426 } else {
1427 None
1428 };
1429 let correlation_id = if flags & 0x10 != 0 {
1430 Some(buf.get_u32())
1431 } else {
1432 None
1433 };
1434
1435 Ok(Message::Ack(AckMessage {
1436 address,
1437 revision,
1438 locked,
1439 holder,
1440 correlation_id,
1441 }))
1442}
1443
1444fn decode_error(buf: &mut &[u8]) -> Result<Message> {
1445 let code = buf.get_u16();
1446 let message = decode_string(buf)?;
1447 let flags = buf.get_u8();
1448
1449 let address = if flags & 0x01 != 0 {
1450 Some(decode_string(buf)?)
1451 } else {
1452 None
1453 };
1454 let correlation_id = if flags & 0x02 != 0 {
1455 Some(buf.get_u32())
1456 } else {
1457 None
1458 };
1459
1460 Ok(Message::Error(ErrorMessage {
1461 code,
1462 message,
1463 address,
1464 correlation_id,
1465 }))
1466}
1467
1468fn decode_query(buf: &mut &[u8]) -> Result<Message> {
1469 let pattern = decode_string(buf)?;
1470 Ok(Message::Query(QueryMessage { pattern }))
1471}
1472
1473fn decode_result(buf: &mut &[u8]) -> Result<Message> {
1474 let count = buf.get_u16() as usize;
1475 let mut signals = Vec::with_capacity(count);
1476
1477 for _ in 0..count {
1478 let address = decode_string(buf)?;
1479 let sig_code = buf.get_u8();
1480 let opt_flags = buf.get_u8();
1481
1482 let datatype = if opt_flags & 0x01 != 0 {
1483 Some(decode_string(buf)?)
1484 } else {
1485 None
1486 };
1487 let access = if opt_flags & 0x02 != 0 {
1488 Some(decode_string(buf)?)
1489 } else {
1490 None
1491 };
1492
1493 signals.push(SignalDefinition {
1494 address,
1495 signal_type: signal_type_from_code(sig_code),
1496 datatype,
1497 access,
1498 meta: None,
1499 });
1500 }
1501
1502 Ok(Message::Result(ResultMessage { signals }))
1503}
1504
1505#[inline(always)]
1510fn decode_string(buf: &mut &[u8]) -> Result<String> {
1511 if buf.remaining() < 2 {
1512 return Err(Error::BufferTooSmall {
1513 needed: 2,
1514 have: buf.remaining(),
1515 });
1516 }
1517 let len = buf.get_u16() as usize;
1518 if buf.remaining() < len {
1519 return Err(Error::BufferTooSmall {
1520 needed: len,
1521 have: buf.remaining(),
1522 });
1523 }
1524 let bytes = &buf[..len];
1525 buf.advance(len);
1526 String::from_utf8(bytes.to_vec()).map_err(|e| Error::DecodeError(e.to_string()))
1527}
1528
1529#[inline]
1530fn decode_value_data(buf: &mut &[u8], vtype: u8) -> Result<Value> {
1531 match vtype {
1532 val::NULL => Ok(Value::Null),
1533 val::BOOL => {
1534 let b = buf.get_u8();
1535 Ok(Value::Bool(b != 0))
1536 }
1537 val::I8 => {
1538 let i = buf.get_i8() as i64;
1539 Ok(Value::Int(i))
1540 }
1541 val::I16 => {
1542 let i = buf.get_i16() as i64;
1543 Ok(Value::Int(i))
1544 }
1545 val::I32 => {
1546 let i = buf.get_i32() as i64;
1547 Ok(Value::Int(i))
1548 }
1549 val::I64 => {
1550 let i = buf.get_i64();
1551 Ok(Value::Int(i))
1552 }
1553 val::F32 => {
1554 let f = buf.get_f32() as f64;
1555 Ok(Value::Float(f))
1556 }
1557 val::F64 => {
1558 let f = buf.get_f64();
1559 Ok(Value::Float(f))
1560 }
1561 val::STRING => {
1562 let s = decode_string(buf)?;
1563 Ok(Value::String(s))
1564 }
1565 val::BYTES => {
1566 if buf.remaining() < 2 {
1567 return Err(Error::BufferTooSmall {
1568 needed: 2,
1569 have: buf.remaining(),
1570 });
1571 }
1572 let len = buf.get_u16() as usize;
1573 if buf.remaining() < len {
1574 return Err(Error::BufferTooSmall {
1575 needed: len,
1576 have: buf.remaining(),
1577 });
1578 }
1579 let bytes = buf[..len].to_vec();
1580 buf.advance(len);
1581 Ok(Value::Bytes(bytes))
1582 }
1583 val::ARRAY => {
1584 let count = buf.get_u16() as usize;
1585 let mut arr = Vec::with_capacity(count);
1586 for _ in 0..count {
1587 let item_type = buf.get_u8();
1588 arr.push(decode_value_data(buf, item_type)?);
1589 }
1590 Ok(Value::Array(arr))
1591 }
1592 val::MAP => {
1593 let count = buf.get_u16() as usize;
1594 let mut map = HashMap::with_capacity(count);
1595 for _ in 0..count {
1596 let key = decode_string(buf)?;
1597 let val_type = buf.get_u8();
1598 let val = decode_value_data(buf, val_type)?;
1599 map.insert(key, val);
1600 }
1601 Ok(Value::Map(map))
1602 }
1603 _ => Err(Error::DecodeError(format!(
1604 "unknown value type: 0x{:02x}",
1605 vtype
1606 ))),
1607 }
1608}
1609
1610fn signal_type_from_code(code: u8) -> SignalType {
1611 match code {
1612 sig::PARAM => SignalType::Param,
1613 sig::EVENT => SignalType::Event,
1614 sig::STREAM => SignalType::Stream,
1615 sig::GESTURE => SignalType::Gesture,
1616 sig::TIMELINE => SignalType::Timeline,
1617 _ => SignalType::Event, }
1619}
1620
1621fn gesture_phase_from_code(code: u8) -> GesturePhase {
1622 match code {
1623 phase::START => GesturePhase::Start,
1624 phase::MOVE => GesturePhase::Move,
1625 phase::END => GesturePhase::End,
1626 phase::CANCEL => GesturePhase::Cancel,
1627 _ => GesturePhase::Start, }
1629}
1630
1631fn is_msgpack_map(byte: u8) -> bool {
1636 (byte & 0xF0) == 0x80 || byte == 0xDE || byte == 0xDF
1638}
1639
1640fn decode_v2_msgpack(bytes: &[u8]) -> Result<Message> {
1641 rmp_serde::from_slice(bytes).map_err(|e| Error::DecodeError(e.to_string()))
1642}
1643
1644#[cfg(test)]
1649mod tests {
1650 use super::*;
1651
1652 #[test]
1653 fn test_hello_roundtrip() {
1654 let msg = Message::Hello(HelloMessage {
1655 version: 1,
1656 name: "Test Client".to_string(),
1657 features: vec!["param".to_string(), "event".to_string()],
1658 capabilities: None,
1659 token: None,
1660 });
1661
1662 let encoded = encode(&msg).unwrap();
1663 let (decoded, frame) = decode(&encoded).unwrap();
1664
1665 match decoded {
1666 Message::Hello(hello) => {
1667 assert_eq!(hello.version, 1);
1668 assert_eq!(hello.name, "Test Client");
1669 assert!(hello.features.contains(&"param".to_string()));
1670 assert!(hello.features.contains(&"event".to_string()));
1671 }
1672 _ => panic!("Expected Hello message"),
1673 }
1674
1675 assert_eq!(frame.flags.qos, QoS::Fire);
1676 assert_eq!(frame.flags.version, 1); }
1678
1679 #[test]
1680 fn test_set_roundtrip() {
1681 let msg = Message::Set(SetMessage {
1682 address: "/test/value".to_string(),
1683 value: Value::Float(0.75),
1684 revision: Some(42),
1685 lock: false,
1686 unlock: false, ttl: None,
1687 });
1688
1689 let encoded = encode(&msg).unwrap();
1690 let (decoded, frame) = decode(&encoded).unwrap();
1691
1692 match decoded {
1693 Message::Set(set) => {
1694 assert_eq!(set.address, "/test/value");
1695 assert_eq!(set.value.as_f64(), Some(0.75));
1696 assert_eq!(set.revision, Some(42));
1697 }
1698 _ => panic!("Expected Set message"),
1699 }
1700
1701 assert_eq!(frame.flags.qos, QoS::Confirm);
1702 }
1703
1704 #[test]
1705 fn test_set_size_reduction() {
1706 let msg = Message::Set(SetMessage {
1707 address: "/test/value".to_string(),
1708 value: Value::Float(0.5),
1709 revision: Some(1),
1710 lock: false,
1711 unlock: false, ttl: None,
1712 });
1713
1714 let binary_payload = encode_message(&msg).unwrap();
1716
1717 let msgpack_payload = rmp_serde::to_vec_named(&msg).unwrap();
1719
1720 println!("Binary payload: {} bytes", binary_payload.len());
1721 println!("MessagePack payload: {} bytes", msgpack_payload.len());
1722
1723 assert!(
1725 binary_payload.len() < msgpack_payload.len(),
1726 "Binary encoding ({}) should be smaller than MessagePack ({})",
1727 binary_payload.len(),
1728 msgpack_payload.len()
1729 );
1730
1731 let savings = 100 - (binary_payload.len() * 100 / msgpack_payload.len());
1733 println!("Size reduction: {}%", savings);
1734 assert!(
1735 savings >= 40,
1736 "Expected at least 40% size reduction, got {}%",
1737 savings
1738 );
1739 }
1740
1741 #[test]
1742 fn test_bundle_roundtrip() {
1743 let msg = Message::Bundle(BundleMessage {
1744 timestamp: Some(1000000),
1745 messages: vec![
1746 Message::Set(SetMessage {
1747 address: "/light/1".to_string(),
1748 value: Value::Float(1.0),
1749 revision: None,
1750 lock: false,
1751 unlock: false, ttl: None,
1752 }),
1753 Message::Set(SetMessage {
1754 address: "/light/2".to_string(),
1755 value: Value::Float(0.0),
1756 revision: None,
1757 lock: false,
1758 unlock: false, ttl: None,
1759 }),
1760 ],
1761 });
1762
1763 let encoded = encode(&msg).unwrap();
1764 let (decoded, _) = decode(&encoded).unwrap();
1765
1766 match decoded {
1767 Message::Bundle(bundle) => {
1768 assert_eq!(bundle.timestamp, Some(1000000));
1769 assert_eq!(bundle.messages.len(), 2);
1770 }
1771 _ => panic!("Expected Bundle message"),
1772 }
1773 }
1774
1775 #[test]
1776 fn test_value_types() {
1777 let values = vec![
1778 Value::Null,
1779 Value::Bool(true),
1780 Value::Int(42),
1781 Value::Float(1.25),
1782 Value::String("hello".to_string()),
1783 Value::Array(vec![Value::Int(1), Value::Int(2), Value::Int(3)]),
1784 ];
1785
1786 for value in values {
1787 let msg = Message::Set(SetMessage {
1788 address: "/test".to_string(),
1789 value: value.clone(),
1790 revision: None,
1791 lock: false,
1792 unlock: false, ttl: None,
1793 });
1794
1795 let encoded = encode(&msg).unwrap();
1796 let (decoded, _) = decode(&encoded).unwrap();
1797
1798 match decoded {
1799 Message::Set(set) => {
1800 assert_eq!(set.value, value);
1801 }
1802 _ => panic!("Expected Set message"),
1803 }
1804 }
1805 }
1806
1807 #[test]
1808 fn test_backward_compat_v2_decode() {
1809 let msg = SetMessage {
1811 address: "/test/value".to_string(),
1812 value: Value::Float(0.5),
1813 revision: Some(1),
1814 lock: false,
1815 unlock: false, ttl: None,
1816 };
1817
1818 let v2_bytes = rmp_serde::to_vec_named(&Message::Set(msg.clone())).unwrap();
1820
1821 let decoded = decode_message(&v2_bytes).unwrap();
1823
1824 match decoded {
1825 Message::Set(set) => {
1826 assert_eq!(set.address, "/test/value");
1827 assert_eq!(set.value.as_f64(), Some(0.5));
1828 }
1829 _ => panic!("Expected Set message"),
1830 }
1831 }
1832
1833 #[test]
1834 fn test_ping_pong() {
1835 let ping = encode(&Message::Ping).unwrap();
1836 let (decoded, _) = decode(&ping).unwrap();
1837 assert!(matches!(decoded, Message::Ping));
1838
1839 let pong = encode(&Message::Pong).unwrap();
1840 let (decoded, _) = decode(&pong).unwrap();
1841 assert!(matches!(decoded, Message::Pong));
1842 }
1843
1844 #[test]
1845 fn test_publish_event() {
1846 let msg = Message::Publish(PublishMessage {
1847 address: "/cue/fire".to_string(),
1848 signal: Some(SignalType::Event),
1849 value: None,
1850 payload: Some(Value::String("intro".to_string())),
1851 samples: None,
1852 rate: None,
1853 id: None,
1854 phase: None,
1855 timestamp: Some(1234567890),
1856 timeline: None,
1857 });
1858
1859 let encoded = encode(&msg).unwrap();
1860 let (decoded, _) = decode(&encoded).unwrap();
1861
1862 match decoded {
1863 Message::Publish(pub_msg) => {
1864 assert_eq!(pub_msg.address, "/cue/fire");
1865 assert_eq!(pub_msg.signal, Some(SignalType::Event));
1866 assert_eq!(pub_msg.timestamp, Some(1234567890));
1867 }
1868 _ => panic!("Expected Publish message"),
1869 }
1870 }
1871
1872 #[test]
1873 fn test_subscribe_roundtrip() {
1874 let msg = Message::Subscribe(SubscribeMessage {
1875 id: 42,
1876 pattern: "/lumen/scene/*/layer/**".to_string(),
1877 types: vec![SignalType::Param, SignalType::Stream],
1878 options: Some(SubscribeOptions {
1879 max_rate: Some(60),
1880 epsilon: Some(0.01),
1881 history: None,
1882 window: None,
1883 }),
1884 });
1885
1886 let encoded = encode(&msg).unwrap();
1887 let (decoded, _) = decode(&encoded).unwrap();
1888
1889 match decoded {
1890 Message::Subscribe(sub) => {
1891 assert_eq!(sub.id, 42);
1892 assert_eq!(sub.pattern, "/lumen/scene/*/layer/**");
1893 assert!(sub.types.contains(&SignalType::Param));
1894 assert!(sub.types.contains(&SignalType::Stream));
1895 assert_eq!(sub.options.as_ref().unwrap().max_rate, Some(60));
1896 }
1897 _ => panic!("Expected Subscribe message"),
1898 }
1899 }
1900}