1use crate::types::*;
14use crate::{Error, Frame, QoS, Result};
15use bytes::{Buf, BufMut, Bytes, BytesMut};
16use std::collections::HashMap;
17
18pub const ENCODING_VERSION: u8 = 1;
20
21pub mod msg {
23 pub const HELLO: u8 = 0x01;
24 pub const WELCOME: u8 = 0x02;
25 pub const ANNOUNCE: u8 = 0x03;
26 pub const SUBSCRIBE: u8 = 0x10;
27 pub const UNSUBSCRIBE: u8 = 0x11;
28 pub const PUBLISH: u8 = 0x20;
29 pub const SET: u8 = 0x21;
30 pub const GET: u8 = 0x22;
31 pub const SNAPSHOT: u8 = 0x23;
32 pub const BUNDLE: u8 = 0x30;
33 pub const SYNC: u8 = 0x40;
34 pub const PING: u8 = 0x41;
35 pub const PONG: u8 = 0x42;
36 pub const ACK: u8 = 0x50;
37 pub const ERROR: u8 = 0x51;
38 pub const QUERY: u8 = 0x60;
39 pub const RESULT: u8 = 0x61;
40}
41
42pub mod val {
44 pub const NULL: u8 = 0x00;
45 pub const BOOL: u8 = 0x01;
46 pub const I8: u8 = 0x02;
47 pub const I16: u8 = 0x03;
48 pub const I32: u8 = 0x04;
49 pub const I64: u8 = 0x05;
50 pub const F32: u8 = 0x06;
51 pub const F64: u8 = 0x07;
52 pub const STRING: u8 = 0x08;
53 pub const BYTES: u8 = 0x09;
54 pub const ARRAY: u8 = 0x0A;
55 pub const MAP: u8 = 0x0B;
56}
57
58pub mod sig {
60 pub const PARAM: u8 = 0;
61 pub const EVENT: u8 = 1;
62 pub const STREAM: u8 = 2;
63 pub const GESTURE: u8 = 3;
64 pub const TIMELINE: u8 = 4;
65}
66
67pub mod phase {
69 pub const START: u8 = 0;
70 pub const MOVE: u8 = 1;
71 pub const END: u8 = 2;
72 pub const CANCEL: u8 = 3;
73}
74
75#[inline]
81pub fn encode_message(message: &Message) -> Result<Bytes> {
82 let capacity = estimate_message_size(message);
84 let mut buf = BytesMut::with_capacity(capacity);
85 encode_message_to_buf(&mut buf, message)?;
86 Ok(buf.freeze())
87}
88
89#[inline]
91fn estimate_message_size(msg: &Message) -> usize {
92 match msg {
93 Message::Set(m) => 2 + 2 + m.address.len() + 9 + if m.revision.is_some() { 8 } else { 0 },
94 Message::Publish(m) => 2 + 2 + m.address.len() + 16,
95 Message::Hello(m) => 4 + m.name.len() + 2,
96 Message::Welcome(m) => 12 + m.name.len() + m.session.len() + 4,
97 Message::Subscribe(m) => 6 + m.pattern.len() + 16,
98 Message::Bundle(m) => 12 + m.messages.len() * 48,
99 Message::Ping | Message::Pong => 5, _ => 64, }
102}
103
104#[inline]
106pub fn decode_message(bytes: &[u8]) -> Result<Message> {
107 if bytes.is_empty() {
108 return Err(Error::BufferTooSmall { needed: 1, have: 0 });
109 }
110
111 let first = bytes[0];
112
113 if is_msgpack_map(first) {
116 decode_v2_msgpack(bytes)
118 } else {
119 decode_v3_binary(bytes)
121 }
122}
123
124#[inline]
126pub fn encode(message: &Message) -> Result<Bytes> {
127 let payload = encode_message(message)?;
128 let mut frame = Frame::new(payload).with_qos(message.default_qos());
129 frame.flags.version = 1; frame.encode()
131}
132
133pub fn encode_with_options(
135 message: &Message,
136 qos: Option<QoS>,
137 timestamp: Option<u64>,
138) -> Result<Bytes> {
139 let payload = encode_message(message)?;
140 let mut frame = Frame::new(payload);
141 frame.flags.version = 1; if let Some(qos) = qos {
144 frame = frame.with_qos(qos);
145 } else {
146 frame = frame.with_qos(message.default_qos());
147 }
148
149 if let Some(ts) = timestamp {
150 frame = frame.with_timestamp(ts);
151 }
152
153 frame.encode()
154}
155
156#[inline]
158pub fn decode(bytes: &[u8]) -> Result<(Message, Frame)> {
159 let frame = Frame::decode(bytes)?;
160 let message = decode_message(&frame.payload)?;
161 Ok((message, frame))
162}
163
164pub fn encode_payload(message: &Message) -> Result<Vec<u8>> {
166 let bytes = encode_message(message)?;
167 Ok(bytes.to_vec())
168}
169
170pub fn decode_payload(bytes: &[u8]) -> Result<Message> {
172 decode_message(bytes)
173}
174
175fn encode_message_to_buf(buf: &mut BytesMut, msg: &Message) -> Result<()> {
180 match msg {
181 Message::Hello(m) => encode_hello(buf, m),
182 Message::Welcome(m) => encode_welcome(buf, m),
183 Message::Announce(m) => encode_announce(buf, m),
184 Message::Subscribe(m) => encode_subscribe(buf, m),
185 Message::Unsubscribe(m) => encode_unsubscribe(buf, m),
186 Message::Publish(m) => encode_publish(buf, m),
187 Message::Set(m) => encode_set(buf, m),
188 Message::Get(m) => encode_get(buf, m),
189 Message::Snapshot(m) => encode_snapshot(buf, m),
190 Message::Bundle(m) => encode_bundle(buf, m),
191 Message::Sync(m) => encode_sync(buf, m),
192 Message::Ping => {
193 buf.put_u8(msg::PING);
194 Ok(())
195 }
196 Message::Pong => {
197 buf.put_u8(msg::PONG);
198 Ok(())
199 }
200 Message::Ack(m) => encode_ack(buf, m),
201 Message::Error(m) => encode_error(buf, m),
202 Message::Query(m) => encode_query(buf, m),
203 Message::Result(m) => encode_result(buf, m),
204 }
205}
206
207#[inline]
210fn encode_set(buf: &mut BytesMut, msg: &SetMessage) -> Result<()> {
211 buf.put_u8(msg::SET);
212
213 let vtype = value_type_code(&msg.value);
214 let mut flags = vtype & 0x0F;
215 if msg.revision.is_some() {
216 flags |= 0x80;
217 }
218 if msg.lock {
219 flags |= 0x40;
220 }
221 if msg.unlock {
222 flags |= 0x20;
223 }
224 buf.put_u8(flags);
225
226 encode_string(buf, &msg.address)?;
228
229 encode_value_data(buf, &msg.value)?;
231
232 if let Some(rev) = msg.revision {
234 buf.put_u64(rev);
235 }
236
237 Ok(())
238}
239
240fn encode_publish(buf: &mut BytesMut, msg: &PublishMessage) -> Result<()> {
243 buf.put_u8(msg::PUBLISH);
244
245 let sig_code = msg
246 .signal
247 .map(|s| signal_type_code(s))
248 .unwrap_or(sig::EVENT);
249 let phase_code = msg
250 .phase
251 .map(|p| gesture_phase_code(p))
252 .unwrap_or(phase::START);
253
254 let mut flags: u8 = (sig_code & 0x07) << 5;
255 if msg.timestamp.is_some() {
256 flags |= 0x10;
257 }
258 if msg.id.is_some() {
259 flags |= 0x08;
260 }
261 flags |= phase_code & 0x07;
262 buf.put_u8(flags);
263
264 encode_string(buf, &msg.address)?;
266
267 if let Some(ref value) = msg.value {
269 buf.put_u8(1); buf.put_u8(value_type_code(value));
271 encode_value_data(buf, value)?;
272 } else if let Some(ref payload) = msg.payload {
273 buf.put_u8(1); buf.put_u8(value_type_code(payload));
275 encode_value_data(buf, payload)?;
276 } else if let Some(ref samples) = msg.samples {
277 buf.put_u8(2); buf.put_u16(samples.len() as u16);
279 for sample in samples {
280 buf.put_f64(*sample);
281 }
282 } else {
283 buf.put_u8(0); }
285
286 if let Some(ts) = msg.timestamp {
288 buf.put_u64(ts);
289 }
290
291 if let Some(id) = msg.id {
293 buf.put_u32(id);
294 }
295
296 if let Some(rate) = msg.rate {
298 buf.put_u32(rate);
299 }
300
301 Ok(())
302}
303
304fn encode_hello(buf: &mut BytesMut, msg: &HelloMessage) -> Result<()> {
306 buf.put_u8(msg::HELLO);
307 buf.put_u8(msg.version);
308
309 let mut features: u8 = 0;
311 for f in &msg.features {
312 match f.as_str() {
313 "param" => features |= 0x80,
314 "event" => features |= 0x40,
315 "stream" => features |= 0x20,
316 "gesture" => features |= 0x10,
317 "timeline" => features |= 0x08,
318 _ => {}
319 }
320 }
321 buf.put_u8(features);
322
323 encode_string(buf, &msg.name)?;
325
326 if let Some(ref token) = msg.token {
328 encode_string(buf, token)?;
329 } else {
330 buf.put_u16(0);
331 }
332
333 Ok(())
334}
335
336fn encode_welcome(buf: &mut BytesMut, msg: &WelcomeMessage) -> Result<()> {
338 buf.put_u8(msg::WELCOME);
339 buf.put_u8(msg.version);
340
341 let mut features: u8 = 0;
343 for f in &msg.features {
344 match f.as_str() {
345 "param" => features |= 0x80,
346 "event" => features |= 0x40,
347 "stream" => features |= 0x20,
348 "gesture" => features |= 0x10,
349 "timeline" => features |= 0x08,
350 _ => {}
351 }
352 }
353 buf.put_u8(features);
354
355 buf.put_u64(msg.time);
357
358 encode_string(buf, &msg.session)?;
360
361 encode_string(buf, &msg.name)?;
363
364 if let Some(ref token) = msg.token {
366 encode_string(buf, token)?;
367 } else {
368 buf.put_u16(0);
369 }
370
371 Ok(())
372}
373
374fn encode_announce(buf: &mut BytesMut, msg: &AnnounceMessage) -> Result<()> {
376 buf.put_u8(msg::ANNOUNCE);
377
378 encode_string(buf, &msg.namespace)?;
379 buf.put_u16(msg.signals.len() as u16);
380
381 for sig in &msg.signals {
382 encode_string(buf, &sig.address)?;
383 buf.put_u8(signal_type_code(sig.signal_type));
384
385 let mut opt_flags: u8 = 0;
387 if sig.datatype.is_some() {
388 opt_flags |= 0x01;
389 }
390 if sig.access.is_some() {
391 opt_flags |= 0x02;
392 }
393 if sig.meta.is_some() {
394 opt_flags |= 0x04;
395 }
396 buf.put_u8(opt_flags);
397
398 if let Some(ref dt) = sig.datatype {
399 encode_string(buf, dt)?;
400 }
401 if let Some(ref access) = sig.access {
402 encode_string(buf, access)?;
403 }
404 if let Some(ref meta) = sig.meta {
405 let mut meta_flags: u8 = 0;
407 if meta.unit.is_some() {
408 meta_flags |= 0x01;
409 }
410 if meta.range.is_some() {
411 meta_flags |= 0x02;
412 }
413 if meta.default.is_some() {
414 meta_flags |= 0x04;
415 }
416 if meta.description.is_some() {
417 meta_flags |= 0x08;
418 }
419 buf.put_u8(meta_flags);
420
421 if let Some(ref unit) = meta.unit {
422 encode_string(buf, unit)?;
423 }
424 if let Some((min, max)) = meta.range {
425 buf.put_f64(min);
426 buf.put_f64(max);
427 }
428 if let Some(ref default) = meta.default {
429 buf.put_u8(value_type_code(default));
430 encode_value_data(buf, default)?;
431 }
432 if let Some(ref desc) = meta.description {
433 encode_string(buf, desc)?;
434 }
435 }
436 }
437
438 Ok(())
439}
440
441fn encode_subscribe(buf: &mut BytesMut, msg: &SubscribeMessage) -> Result<()> {
443 buf.put_u8(msg::SUBSCRIBE);
444 buf.put_u32(msg.id);
445
446 encode_string(buf, &msg.pattern)?;
447
448 let mut type_mask: u8 = 0;
450 if msg.types.is_empty() {
451 type_mask = 0xFF; } else {
453 for t in &msg.types {
454 match t {
455 SignalType::Param => type_mask |= 0x01,
456 SignalType::Event => type_mask |= 0x02,
457 SignalType::Stream => type_mask |= 0x04,
458 SignalType::Gesture => type_mask |= 0x08,
459 SignalType::Timeline => type_mask |= 0x10,
460 }
461 }
462 }
463 buf.put_u8(type_mask);
464
465 if let Some(ref opts) = msg.options {
467 let mut opt_flags: u8 = 0;
468 if opts.max_rate.is_some() {
469 opt_flags |= 0x01;
470 }
471 if opts.epsilon.is_some() {
472 opt_flags |= 0x02;
473 }
474 if opts.history.is_some() {
475 opt_flags |= 0x04;
476 }
477 if opts.window.is_some() {
478 opt_flags |= 0x08;
479 }
480 buf.put_u8(opt_flags);
481
482 if let Some(rate) = opts.max_rate {
483 buf.put_u32(rate);
484 }
485 if let Some(eps) = opts.epsilon {
486 buf.put_f64(eps);
487 }
488 if let Some(hist) = opts.history {
489 buf.put_u32(hist);
490 }
491 if let Some(win) = opts.window {
492 buf.put_u32(win);
493 }
494 } else {
495 buf.put_u8(0); }
497
498 Ok(())
499}
500
501fn encode_unsubscribe(buf: &mut BytesMut, msg: &UnsubscribeMessage) -> Result<()> {
503 buf.put_u8(msg::UNSUBSCRIBE);
504 buf.put_u32(msg.id);
505 Ok(())
506}
507
508fn encode_get(buf: &mut BytesMut, msg: &GetMessage) -> Result<()> {
510 buf.put_u8(msg::GET);
511 encode_string(buf, &msg.address)?;
512 Ok(())
513}
514
515fn encode_snapshot(buf: &mut BytesMut, msg: &SnapshotMessage) -> Result<()> {
517 buf.put_u8(msg::SNAPSHOT);
518 buf.put_u16(msg.params.len() as u16);
519
520 for param in &msg.params {
521 encode_string(buf, ¶m.address)?;
522 buf.put_u8(value_type_code(¶m.value));
523 encode_value_data(buf, ¶m.value)?;
524 buf.put_u64(param.revision);
525
526 let mut opt_flags: u8 = 0;
527 if param.writer.is_some() {
528 opt_flags |= 0x01;
529 }
530 if param.timestamp.is_some() {
531 opt_flags |= 0x02;
532 }
533 buf.put_u8(opt_flags);
534
535 if let Some(ref writer) = param.writer {
536 encode_string(buf, writer)?;
537 }
538 if let Some(ts) = param.timestamp {
539 buf.put_u64(ts);
540 }
541 }
542
543 Ok(())
544}
545
546fn encode_bundle(buf: &mut BytesMut, msg: &BundleMessage) -> Result<()> {
548 buf.put_u8(msg::BUNDLE);
549
550 let mut flags: u8 = 0;
551 if msg.timestamp.is_some() {
552 flags |= 0x80;
553 }
554 buf.put_u8(flags);
555
556 buf.put_u16(msg.messages.len() as u16);
557
558 if let Some(ts) = msg.timestamp {
559 buf.put_u64(ts);
560 }
561
562 for inner_msg in &msg.messages {
564 let mut inner_buf = BytesMut::with_capacity(64);
565 encode_message_to_buf(&mut inner_buf, inner_msg)?;
566 buf.put_u16(inner_buf.len() as u16);
567 buf.extend_from_slice(&inner_buf);
568 }
569
570 Ok(())
571}
572
573fn encode_sync(buf: &mut BytesMut, msg: &SyncMessage) -> Result<()> {
575 buf.put_u8(msg::SYNC);
576
577 let mut flags: u8 = 0;
578 if msg.t2.is_some() {
579 flags |= 0x01;
580 }
581 if msg.t3.is_some() {
582 flags |= 0x02;
583 }
584 buf.put_u8(flags);
585
586 buf.put_u64(msg.t1);
587 if let Some(t2) = msg.t2 {
588 buf.put_u64(t2);
589 }
590 if let Some(t3) = msg.t3 {
591 buf.put_u64(t3);
592 }
593
594 Ok(())
595}
596
597fn encode_ack(buf: &mut BytesMut, msg: &AckMessage) -> Result<()> {
599 buf.put_u8(msg::ACK);
600
601 let mut flags: u8 = 0;
602 if msg.address.is_some() {
603 flags |= 0x01;
604 }
605 if msg.revision.is_some() {
606 flags |= 0x02;
607 }
608 if msg.locked.is_some() {
609 flags |= 0x04;
610 }
611 if msg.holder.is_some() {
612 flags |= 0x08;
613 }
614 if msg.correlation_id.is_some() {
615 flags |= 0x10;
616 }
617 buf.put_u8(flags);
618
619 if let Some(ref addr) = msg.address {
620 encode_string(buf, addr)?;
621 }
622 if let Some(rev) = msg.revision {
623 buf.put_u64(rev);
624 }
625 if let Some(locked) = msg.locked {
626 buf.put_u8(if locked { 1 } else { 0 });
627 }
628 if let Some(ref holder) = msg.holder {
629 encode_string(buf, holder)?;
630 }
631 if let Some(corr) = msg.correlation_id {
632 buf.put_u32(corr);
633 }
634
635 Ok(())
636}
637
638fn encode_error(buf: &mut BytesMut, msg: &ErrorMessage) -> Result<()> {
640 buf.put_u8(msg::ERROR);
641 buf.put_u16(msg.code);
642 encode_string(buf, &msg.message)?;
643
644 let mut flags: u8 = 0;
645 if msg.address.is_some() {
646 flags |= 0x01;
647 }
648 if msg.correlation_id.is_some() {
649 flags |= 0x02;
650 }
651 buf.put_u8(flags);
652
653 if let Some(ref addr) = msg.address {
654 encode_string(buf, addr)?;
655 }
656 if let Some(corr) = msg.correlation_id {
657 buf.put_u32(corr);
658 }
659
660 Ok(())
661}
662
663fn encode_query(buf: &mut BytesMut, msg: &QueryMessage) -> Result<()> {
665 buf.put_u8(msg::QUERY);
666 encode_string(buf, &msg.pattern)?;
667 Ok(())
668}
669
670fn encode_result(buf: &mut BytesMut, msg: &ResultMessage) -> Result<()> {
672 buf.put_u8(msg::RESULT);
673 buf.put_u16(msg.signals.len() as u16);
674
675 for sig in &msg.signals {
676 encode_string(buf, &sig.address)?;
677 buf.put_u8(signal_type_code(sig.signal_type));
678
679 let mut opt_flags: u8 = 0;
680 if sig.datatype.is_some() {
681 opt_flags |= 0x01;
682 }
683 if sig.access.is_some() {
684 opt_flags |= 0x02;
685 }
686 buf.put_u8(opt_flags);
687
688 if let Some(ref dt) = sig.datatype {
689 encode_string(buf, dt)?;
690 }
691 if let Some(ref access) = sig.access {
692 encode_string(buf, access)?;
693 }
694 }
695
696 Ok(())
697}
698
699#[inline(always)]
704fn encode_string(buf: &mut BytesMut, s: &str) -> Result<()> {
705 let bytes = s.as_bytes();
706 if bytes.len() > u16::MAX as usize {
707 return Err(Error::PayloadTooLarge(bytes.len()));
708 }
709 buf.put_u16(bytes.len() as u16);
710 buf.extend_from_slice(bytes);
711 Ok(())
712}
713
714#[inline]
715fn encode_value_data(buf: &mut BytesMut, value: &Value) -> Result<()> {
716 match value {
717 Value::Null => {} Value::Bool(b) => buf.put_u8(if *b { 1 } else { 0 }),
719 Value::Int(i) => buf.put_i64(*i),
720 Value::Float(f) => buf.put_f64(*f),
721 Value::String(s) => encode_string(buf, s)?,
722 Value::Bytes(b) => {
723 buf.put_u16(b.len() as u16);
724 buf.extend_from_slice(b);
725 }
726 Value::Array(arr) => {
727 buf.put_u16(arr.len() as u16);
728 for item in arr {
729 buf.put_u8(value_type_code(item));
730 encode_value_data(buf, item)?;
731 }
732 }
733 Value::Map(map) => {
734 buf.put_u16(map.len() as u16);
735 for (key, val) in map {
736 encode_string(buf, key)?;
737 buf.put_u8(value_type_code(val));
738 encode_value_data(buf, val)?;
739 }
740 }
741 }
742 Ok(())
743}
744
745#[inline(always)]
746fn value_type_code(value: &Value) -> u8 {
747 match value {
748 Value::Null => val::NULL,
749 Value::Bool(_) => val::BOOL,
750 Value::Int(_) => val::I64,
751 Value::Float(_) => val::F64,
752 Value::String(_) => val::STRING,
753 Value::Bytes(_) => val::BYTES,
754 Value::Array(_) => val::ARRAY,
755 Value::Map(_) => val::MAP,
756 }
757}
758
759fn signal_type_code(sig: SignalType) -> u8 {
760 match sig {
761 SignalType::Param => sig::PARAM,
762 SignalType::Event => sig::EVENT,
763 SignalType::Stream => sig::STREAM,
764 SignalType::Gesture => sig::GESTURE,
765 SignalType::Timeline => sig::TIMELINE,
766 }
767}
768
769fn gesture_phase_code(phase: GesturePhase) -> u8 {
770 match phase {
771 GesturePhase::Start => phase::START,
772 GesturePhase::Move => phase::MOVE,
773 GesturePhase::End => phase::END,
774 GesturePhase::Cancel => phase::CANCEL,
775 }
776}
777
778fn decode_v3_binary(bytes: &[u8]) -> Result<Message> {
783 if bytes.is_empty() {
784 return Err(Error::BufferTooSmall { needed: 1, have: 0 });
785 }
786
787 let mut buf = bytes;
788 let msg_type = buf.get_u8();
789
790 match msg_type {
791 msg::HELLO => decode_hello(&mut buf),
792 msg::WELCOME => decode_welcome(&mut buf),
793 msg::ANNOUNCE => decode_announce(&mut buf),
794 msg::SUBSCRIBE => decode_subscribe(&mut buf),
795 msg::UNSUBSCRIBE => decode_unsubscribe(&mut buf),
796 msg::PUBLISH => decode_publish(&mut buf),
797 msg::SET => decode_set(&mut buf),
798 msg::GET => decode_get(&mut buf),
799 msg::SNAPSHOT => decode_snapshot(&mut buf),
800 msg::BUNDLE => decode_bundle(&mut buf),
801 msg::SYNC => decode_sync(&mut buf),
802 msg::PING => Ok(Message::Ping),
803 msg::PONG => Ok(Message::Pong),
804 msg::ACK => decode_ack(&mut buf),
805 msg::ERROR => decode_error(&mut buf),
806 msg::QUERY => decode_query(&mut buf),
807 msg::RESULT => decode_result(&mut buf),
808 _ => Err(Error::UnknownMessageType(msg_type)),
809 }
810}
811
812#[inline]
813fn decode_set(buf: &mut &[u8]) -> Result<Message> {
814 let flags = buf.get_u8();
815 let vtype = flags & 0x0F;
816 let has_rev = (flags & 0x80) != 0;
817 let lock = (flags & 0x40) != 0;
818 let unlock = (flags & 0x20) != 0;
819
820 let address = decode_string(buf)?;
821 let value = decode_value_data(buf, vtype)?;
822
823 let revision = if has_rev { Some(buf.get_u64()) } else { None };
824
825 Ok(Message::Set(SetMessage {
826 address,
827 value,
828 revision,
829 lock,
830 unlock,
831 }))
832}
833
834fn decode_publish(buf: &mut &[u8]) -> Result<Message> {
835 let flags = buf.get_u8();
836 let sig_code = (flags >> 5) & 0x07;
837 let has_ts = (flags & 0x10) != 0;
838 let has_id = (flags & 0x08) != 0;
839 let phase_code = flags & 0x07;
840
841 let address = decode_string(buf)?;
842
843 let value_indicator = buf.get_u8();
845 let (value, payload, samples) = match value_indicator {
846 0 => (None, None, None),
847 1 => {
848 let vtype = buf.get_u8();
849 let v = decode_value_data(buf, vtype)?;
850 (Some(v), None, None)
851 }
852 2 => {
853 let count = buf.get_u16() as usize;
854 let mut s = Vec::with_capacity(count);
855 for _ in 0..count {
856 s.push(buf.get_f64());
857 }
858 (None, None, Some(s))
859 }
860 _ => (None, None, None),
861 };
862
863 let timestamp = if has_ts { Some(buf.get_u64()) } else { None };
864 let id = if has_id { Some(buf.get_u32()) } else { None };
865
866 let rate = if buf.remaining() >= 4 {
868 Some(buf.get_u32())
869 } else {
870 None
871 };
872
873 let signal = Some(signal_type_from_code(sig_code));
874 let phase = Some(gesture_phase_from_code(phase_code));
875
876 Ok(Message::Publish(PublishMessage {
877 address,
878 signal,
879 value,
880 payload,
881 samples,
882 rate,
883 id,
884 phase,
885 timestamp,
886 timeline: None, }))
888}
889
890fn decode_hello(buf: &mut &[u8]) -> Result<Message> {
891 let version = buf.get_u8();
892 let feature_flags = buf.get_u8();
893
894 let mut features = Vec::new();
895 if feature_flags & 0x80 != 0 {
896 features.push("param".to_string());
897 }
898 if feature_flags & 0x40 != 0 {
899 features.push("event".to_string());
900 }
901 if feature_flags & 0x20 != 0 {
902 features.push("stream".to_string());
903 }
904 if feature_flags & 0x10 != 0 {
905 features.push("gesture".to_string());
906 }
907 if feature_flags & 0x08 != 0 {
908 features.push("timeline".to_string());
909 }
910
911 let name = decode_string(buf)?;
912 let token_str = decode_string(buf)?;
913 let token = if token_str.is_empty() {
914 None
915 } else {
916 Some(token_str)
917 };
918
919 Ok(Message::Hello(HelloMessage {
920 version,
921 name,
922 features,
923 capabilities: None,
924 token,
925 }))
926}
927
928fn decode_welcome(buf: &mut &[u8]) -> Result<Message> {
929 let version = buf.get_u8();
930 let feature_flags = buf.get_u8();
931
932 let mut features = Vec::new();
933 if feature_flags & 0x80 != 0 {
934 features.push("param".to_string());
935 }
936 if feature_flags & 0x40 != 0 {
937 features.push("event".to_string());
938 }
939 if feature_flags & 0x20 != 0 {
940 features.push("stream".to_string());
941 }
942 if feature_flags & 0x10 != 0 {
943 features.push("gesture".to_string());
944 }
945 if feature_flags & 0x08 != 0 {
946 features.push("timeline".to_string());
947 }
948
949 let time = buf.get_u64();
950 let session = decode_string(buf)?;
951 let name = decode_string(buf)?;
952
953 let token_str = decode_string(buf)?;
954 let token = if token_str.is_empty() {
955 None
956 } else {
957 Some(token_str)
958 };
959
960 Ok(Message::Welcome(WelcomeMessage {
961 version,
962 session,
963 name,
964 features,
965 time,
966 token,
967 }))
968}
969
970fn decode_announce(buf: &mut &[u8]) -> Result<Message> {
971 let namespace = decode_string(buf)?;
972 let count = buf.get_u16() as usize;
973
974 let mut signals = Vec::with_capacity(count);
975 for _ in 0..count {
976 let address = decode_string(buf)?;
977 let sig_code = buf.get_u8();
978 let opt_flags = buf.get_u8();
979
980 let datatype = if opt_flags & 0x01 != 0 {
981 Some(decode_string(buf)?)
982 } else {
983 None
984 };
985 let access = if opt_flags & 0x02 != 0 {
986 Some(decode_string(buf)?)
987 } else {
988 None
989 };
990
991 let meta = if opt_flags & 0x04 != 0 {
992 let meta_flags = buf.get_u8();
993
994 let unit = if meta_flags & 0x01 != 0 {
995 Some(decode_string(buf)?)
996 } else {
997 None
998 };
999 let range = if meta_flags & 0x02 != 0 {
1000 let min = buf.get_f64();
1001 let max = buf.get_f64();
1002 Some((min, max))
1003 } else {
1004 None
1005 };
1006 let default = if meta_flags & 0x04 != 0 {
1007 let vtype = buf.get_u8();
1008 Some(decode_value_data(buf, vtype)?)
1009 } else {
1010 None
1011 };
1012 let description = if meta_flags & 0x08 != 0 {
1013 Some(decode_string(buf)?)
1014 } else {
1015 None
1016 };
1017
1018 Some(SignalMeta {
1019 unit,
1020 range,
1021 default,
1022 description,
1023 })
1024 } else {
1025 None
1026 };
1027
1028 signals.push(SignalDefinition {
1029 address,
1030 signal_type: signal_type_from_code(sig_code),
1031 datatype,
1032 access,
1033 meta,
1034 });
1035 }
1036
1037 Ok(Message::Announce(AnnounceMessage {
1038 namespace,
1039 signals,
1040 meta: None,
1041 }))
1042}
1043
1044fn decode_subscribe(buf: &mut &[u8]) -> Result<Message> {
1045 let id = buf.get_u32();
1046 let pattern = decode_string(buf)?;
1047 let type_mask = buf.get_u8();
1048
1049 let mut types = Vec::new();
1050 if type_mask == 0xFF {
1051 } else {
1053 if type_mask & 0x01 != 0 {
1054 types.push(SignalType::Param);
1055 }
1056 if type_mask & 0x02 != 0 {
1057 types.push(SignalType::Event);
1058 }
1059 if type_mask & 0x04 != 0 {
1060 types.push(SignalType::Stream);
1061 }
1062 if type_mask & 0x08 != 0 {
1063 types.push(SignalType::Gesture);
1064 }
1065 if type_mask & 0x10 != 0 {
1066 types.push(SignalType::Timeline);
1067 }
1068 }
1069
1070 let opt_flags = buf.get_u8();
1071 let options = if opt_flags != 0 {
1072 let max_rate = if opt_flags & 0x01 != 0 {
1073 Some(buf.get_u32())
1074 } else {
1075 None
1076 };
1077 let epsilon = if opt_flags & 0x02 != 0 {
1078 Some(buf.get_f64())
1079 } else {
1080 None
1081 };
1082 let history = if opt_flags & 0x04 != 0 {
1083 Some(buf.get_u32())
1084 } else {
1085 None
1086 };
1087 let window = if opt_flags & 0x08 != 0 {
1088 Some(buf.get_u32())
1089 } else {
1090 None
1091 };
1092
1093 Some(SubscribeOptions {
1094 max_rate,
1095 epsilon,
1096 history,
1097 window,
1098 })
1099 } else {
1100 None
1101 };
1102
1103 Ok(Message::Subscribe(SubscribeMessage {
1104 id,
1105 pattern,
1106 types,
1107 options,
1108 }))
1109}
1110
1111fn decode_unsubscribe(buf: &mut &[u8]) -> Result<Message> {
1112 let id = buf.get_u32();
1113 Ok(Message::Unsubscribe(UnsubscribeMessage { id }))
1114}
1115
1116fn decode_get(buf: &mut &[u8]) -> Result<Message> {
1117 let address = decode_string(buf)?;
1118 Ok(Message::Get(GetMessage { address }))
1119}
1120
1121fn decode_snapshot(buf: &mut &[u8]) -> Result<Message> {
1122 let count = buf.get_u16() as usize;
1123 let mut params = Vec::with_capacity(count);
1124
1125 for _ in 0..count {
1126 let address = decode_string(buf)?;
1127 let vtype = buf.get_u8();
1128 let value = decode_value_data(buf, vtype)?;
1129 let revision = buf.get_u64();
1130 let opt_flags = buf.get_u8();
1131
1132 let writer = if opt_flags & 0x01 != 0 {
1133 Some(decode_string(buf)?)
1134 } else {
1135 None
1136 };
1137 let timestamp = if opt_flags & 0x02 != 0 {
1138 Some(buf.get_u64())
1139 } else {
1140 None
1141 };
1142
1143 params.push(ParamValue {
1144 address,
1145 value,
1146 revision,
1147 writer,
1148 timestamp,
1149 });
1150 }
1151
1152 Ok(Message::Snapshot(SnapshotMessage { params }))
1153}
1154
1155fn decode_bundle(buf: &mut &[u8]) -> Result<Message> {
1156 let flags = buf.get_u8();
1157 let has_ts = (flags & 0x80) != 0;
1158 let count = buf.get_u16() as usize;
1159
1160 let timestamp = if has_ts { Some(buf.get_u64()) } else { None };
1161
1162 let mut messages = Vec::with_capacity(count);
1163 for _ in 0..count {
1164 let len = buf.get_u16() as usize;
1165 let inner_bytes = &buf[..len];
1166 buf.advance(len);
1167 messages.push(decode_v3_binary(inner_bytes)?);
1168 }
1169
1170 Ok(Message::Bundle(BundleMessage {
1171 timestamp,
1172 messages,
1173 }))
1174}
1175
1176fn decode_sync(buf: &mut &[u8]) -> Result<Message> {
1177 let flags = buf.get_u8();
1178 let t1 = buf.get_u64();
1179 let t2 = if flags & 0x01 != 0 {
1180 Some(buf.get_u64())
1181 } else {
1182 None
1183 };
1184 let t3 = if flags & 0x02 != 0 {
1185 Some(buf.get_u64())
1186 } else {
1187 None
1188 };
1189
1190 Ok(Message::Sync(SyncMessage { t1, t2, t3 }))
1191}
1192
1193fn decode_ack(buf: &mut &[u8]) -> Result<Message> {
1194 let flags = buf.get_u8();
1195
1196 let address = if flags & 0x01 != 0 {
1197 Some(decode_string(buf)?)
1198 } else {
1199 None
1200 };
1201 let revision = if flags & 0x02 != 0 {
1202 Some(buf.get_u64())
1203 } else {
1204 None
1205 };
1206 let locked = if flags & 0x04 != 0 {
1207 Some(buf.get_u8() != 0)
1208 } else {
1209 None
1210 };
1211 let holder = if flags & 0x08 != 0 {
1212 Some(decode_string(buf)?)
1213 } else {
1214 None
1215 };
1216 let correlation_id = if flags & 0x10 != 0 {
1217 Some(buf.get_u32())
1218 } else {
1219 None
1220 };
1221
1222 Ok(Message::Ack(AckMessage {
1223 address,
1224 revision,
1225 locked,
1226 holder,
1227 correlation_id,
1228 }))
1229}
1230
1231fn decode_error(buf: &mut &[u8]) -> Result<Message> {
1232 let code = buf.get_u16();
1233 let message = decode_string(buf)?;
1234 let flags = buf.get_u8();
1235
1236 let address = if flags & 0x01 != 0 {
1237 Some(decode_string(buf)?)
1238 } else {
1239 None
1240 };
1241 let correlation_id = if flags & 0x02 != 0 {
1242 Some(buf.get_u32())
1243 } else {
1244 None
1245 };
1246
1247 Ok(Message::Error(ErrorMessage {
1248 code,
1249 message,
1250 address,
1251 correlation_id,
1252 }))
1253}
1254
1255fn decode_query(buf: &mut &[u8]) -> Result<Message> {
1256 let pattern = decode_string(buf)?;
1257 Ok(Message::Query(QueryMessage { pattern }))
1258}
1259
1260fn decode_result(buf: &mut &[u8]) -> Result<Message> {
1261 let count = buf.get_u16() as usize;
1262 let mut signals = Vec::with_capacity(count);
1263
1264 for _ in 0..count {
1265 let address = decode_string(buf)?;
1266 let sig_code = buf.get_u8();
1267 let opt_flags = buf.get_u8();
1268
1269 let datatype = if opt_flags & 0x01 != 0 {
1270 Some(decode_string(buf)?)
1271 } else {
1272 None
1273 };
1274 let access = if opt_flags & 0x02 != 0 {
1275 Some(decode_string(buf)?)
1276 } else {
1277 None
1278 };
1279
1280 signals.push(SignalDefinition {
1281 address,
1282 signal_type: signal_type_from_code(sig_code),
1283 datatype,
1284 access,
1285 meta: None,
1286 });
1287 }
1288
1289 Ok(Message::Result(ResultMessage { signals }))
1290}
1291
1292#[inline(always)]
1297fn decode_string(buf: &mut &[u8]) -> Result<String> {
1298 if buf.remaining() < 2 {
1299 return Err(Error::BufferTooSmall {
1300 needed: 2,
1301 have: buf.remaining(),
1302 });
1303 }
1304 let len = buf.get_u16() as usize;
1305 if buf.remaining() < len {
1306 return Err(Error::BufferTooSmall {
1307 needed: len,
1308 have: buf.remaining(),
1309 });
1310 }
1311 let bytes = &buf[..len];
1312 buf.advance(len);
1313 String::from_utf8(bytes.to_vec()).map_err(|e| Error::DecodeError(e.to_string()))
1314}
1315
1316#[inline]
1317fn decode_value_data(buf: &mut &[u8], vtype: u8) -> Result<Value> {
1318 match vtype {
1319 val::NULL => Ok(Value::Null),
1320 val::BOOL => {
1321 let b = buf.get_u8();
1322 Ok(Value::Bool(b != 0))
1323 }
1324 val::I8 => {
1325 let i = buf.get_i8() as i64;
1326 Ok(Value::Int(i))
1327 }
1328 val::I16 => {
1329 let i = buf.get_i16() as i64;
1330 Ok(Value::Int(i))
1331 }
1332 val::I32 => {
1333 let i = buf.get_i32() as i64;
1334 Ok(Value::Int(i))
1335 }
1336 val::I64 => {
1337 let i = buf.get_i64();
1338 Ok(Value::Int(i))
1339 }
1340 val::F32 => {
1341 let f = buf.get_f32() as f64;
1342 Ok(Value::Float(f))
1343 }
1344 val::F64 => {
1345 let f = buf.get_f64();
1346 Ok(Value::Float(f))
1347 }
1348 val::STRING => {
1349 let s = decode_string(buf)?;
1350 Ok(Value::String(s))
1351 }
1352 val::BYTES => {
1353 if buf.remaining() < 2 {
1354 return Err(Error::BufferTooSmall {
1355 needed: 2,
1356 have: buf.remaining(),
1357 });
1358 }
1359 let len = buf.get_u16() as usize;
1360 if buf.remaining() < len {
1361 return Err(Error::BufferTooSmall {
1362 needed: len,
1363 have: buf.remaining(),
1364 });
1365 }
1366 let bytes = buf[..len].to_vec();
1367 buf.advance(len);
1368 Ok(Value::Bytes(bytes))
1369 }
1370 val::ARRAY => {
1371 let count = buf.get_u16() as usize;
1372 let mut arr = Vec::with_capacity(count);
1373 for _ in 0..count {
1374 let item_type = buf.get_u8();
1375 arr.push(decode_value_data(buf, item_type)?);
1376 }
1377 Ok(Value::Array(arr))
1378 }
1379 val::MAP => {
1380 let count = buf.get_u16() as usize;
1381 let mut map = HashMap::with_capacity(count);
1382 for _ in 0..count {
1383 let key = decode_string(buf)?;
1384 let val_type = buf.get_u8();
1385 let val = decode_value_data(buf, val_type)?;
1386 map.insert(key, val);
1387 }
1388 Ok(Value::Map(map))
1389 }
1390 _ => Err(Error::DecodeError(format!(
1391 "unknown value type: 0x{:02x}",
1392 vtype
1393 ))),
1394 }
1395}
1396
1397fn signal_type_from_code(code: u8) -> SignalType {
1398 match code {
1399 sig::PARAM => SignalType::Param,
1400 sig::EVENT => SignalType::Event,
1401 sig::STREAM => SignalType::Stream,
1402 sig::GESTURE => SignalType::Gesture,
1403 sig::TIMELINE => SignalType::Timeline,
1404 _ => SignalType::Event, }
1406}
1407
1408fn gesture_phase_from_code(code: u8) -> GesturePhase {
1409 match code {
1410 phase::START => GesturePhase::Start,
1411 phase::MOVE => GesturePhase::Move,
1412 phase::END => GesturePhase::End,
1413 phase::CANCEL => GesturePhase::Cancel,
1414 _ => GesturePhase::Start, }
1416}
1417
1418fn is_msgpack_map(byte: u8) -> bool {
1423 (byte & 0xF0) == 0x80 || byte == 0xDE || byte == 0xDF
1425}
1426
1427fn decode_v2_msgpack(bytes: &[u8]) -> Result<Message> {
1428 rmp_serde::from_slice(bytes).map_err(|e| Error::DecodeError(e.to_string()))
1429}
1430
1431#[cfg(test)]
1436mod tests {
1437 use super::*;
1438
1439 #[test]
1440 fn test_hello_roundtrip() {
1441 let msg = Message::Hello(HelloMessage {
1442 version: 1,
1443 name: "Test Client".to_string(),
1444 features: vec!["param".to_string(), "event".to_string()],
1445 capabilities: None,
1446 token: None,
1447 });
1448
1449 let encoded = encode(&msg).unwrap();
1450 let (decoded, frame) = decode(&encoded).unwrap();
1451
1452 match decoded {
1453 Message::Hello(hello) => {
1454 assert_eq!(hello.version, 1);
1455 assert_eq!(hello.name, "Test Client");
1456 assert!(hello.features.contains(&"param".to_string()));
1457 assert!(hello.features.contains(&"event".to_string()));
1458 }
1459 _ => panic!("Expected Hello message"),
1460 }
1461
1462 assert_eq!(frame.flags.qos, QoS::Fire);
1463 assert_eq!(frame.flags.version, 1); }
1465
1466 #[test]
1467 fn test_set_roundtrip() {
1468 let msg = Message::Set(SetMessage {
1469 address: "/test/value".to_string(),
1470 value: Value::Float(0.75),
1471 revision: Some(42),
1472 lock: false,
1473 unlock: false,
1474 });
1475
1476 let encoded = encode(&msg).unwrap();
1477 let (decoded, frame) = decode(&encoded).unwrap();
1478
1479 match decoded {
1480 Message::Set(set) => {
1481 assert_eq!(set.address, "/test/value");
1482 assert_eq!(set.value.as_f64(), Some(0.75));
1483 assert_eq!(set.revision, Some(42));
1484 }
1485 _ => panic!("Expected Set message"),
1486 }
1487
1488 assert_eq!(frame.flags.qos, QoS::Confirm);
1489 }
1490
1491 #[test]
1492 fn test_set_size_reduction() {
1493 let msg = Message::Set(SetMessage {
1494 address: "/test/value".to_string(),
1495 value: Value::Float(0.5),
1496 revision: Some(1),
1497 lock: false,
1498 unlock: false,
1499 });
1500
1501 let binary_payload = encode_message(&msg).unwrap();
1503
1504 let msgpack_payload = rmp_serde::to_vec_named(&msg).unwrap();
1506
1507 println!("Binary payload: {} bytes", binary_payload.len());
1508 println!("MessagePack payload: {} bytes", msgpack_payload.len());
1509
1510 assert!(
1512 binary_payload.len() < msgpack_payload.len(),
1513 "Binary encoding ({}) should be smaller than MessagePack ({})",
1514 binary_payload.len(),
1515 msgpack_payload.len()
1516 );
1517
1518 let savings = 100 - (binary_payload.len() * 100 / msgpack_payload.len());
1520 println!("Size reduction: {}%", savings);
1521 assert!(
1522 savings >= 40,
1523 "Expected at least 40% size reduction, got {}%",
1524 savings
1525 );
1526 }
1527
1528 #[test]
1529 fn test_bundle_roundtrip() {
1530 let msg = Message::Bundle(BundleMessage {
1531 timestamp: Some(1000000),
1532 messages: vec![
1533 Message::Set(SetMessage {
1534 address: "/light/1".to_string(),
1535 value: Value::Float(1.0),
1536 revision: None,
1537 lock: false,
1538 unlock: false,
1539 }),
1540 Message::Set(SetMessage {
1541 address: "/light/2".to_string(),
1542 value: Value::Float(0.0),
1543 revision: None,
1544 lock: false,
1545 unlock: false,
1546 }),
1547 ],
1548 });
1549
1550 let encoded = encode(&msg).unwrap();
1551 let (decoded, _) = decode(&encoded).unwrap();
1552
1553 match decoded {
1554 Message::Bundle(bundle) => {
1555 assert_eq!(bundle.timestamp, Some(1000000));
1556 assert_eq!(bundle.messages.len(), 2);
1557 }
1558 _ => panic!("Expected Bundle message"),
1559 }
1560 }
1561
1562 #[test]
1563 fn test_value_types() {
1564 let values = vec![
1565 Value::Null,
1566 Value::Bool(true),
1567 Value::Int(42),
1568 Value::Float(1.25),
1569 Value::String("hello".to_string()),
1570 Value::Array(vec![Value::Int(1), Value::Int(2), Value::Int(3)]),
1571 ];
1572
1573 for value in values {
1574 let msg = Message::Set(SetMessage {
1575 address: "/test".to_string(),
1576 value: value.clone(),
1577 revision: None,
1578 lock: false,
1579 unlock: false,
1580 });
1581
1582 let encoded = encode(&msg).unwrap();
1583 let (decoded, _) = decode(&encoded).unwrap();
1584
1585 match decoded {
1586 Message::Set(set) => {
1587 assert_eq!(set.value, value);
1588 }
1589 _ => panic!("Expected Set message"),
1590 }
1591 }
1592 }
1593
1594 #[test]
1595 fn test_backward_compat_v2_decode() {
1596 let msg = SetMessage {
1598 address: "/test/value".to_string(),
1599 value: Value::Float(0.5),
1600 revision: Some(1),
1601 lock: false,
1602 unlock: false,
1603 };
1604
1605 let v2_bytes = rmp_serde::to_vec_named(&Message::Set(msg.clone())).unwrap();
1607
1608 let decoded = decode_message(&v2_bytes).unwrap();
1610
1611 match decoded {
1612 Message::Set(set) => {
1613 assert_eq!(set.address, "/test/value");
1614 assert_eq!(set.value.as_f64(), Some(0.5));
1615 }
1616 _ => panic!("Expected Set message"),
1617 }
1618 }
1619
1620 #[test]
1621 fn test_ping_pong() {
1622 let ping = encode(&Message::Ping).unwrap();
1623 let (decoded, _) = decode(&ping).unwrap();
1624 assert!(matches!(decoded, Message::Ping));
1625
1626 let pong = encode(&Message::Pong).unwrap();
1627 let (decoded, _) = decode(&pong).unwrap();
1628 assert!(matches!(decoded, Message::Pong));
1629 }
1630
1631 #[test]
1632 fn test_publish_event() {
1633 let msg = Message::Publish(PublishMessage {
1634 address: "/cue/fire".to_string(),
1635 signal: Some(SignalType::Event),
1636 value: None,
1637 payload: Some(Value::String("intro".to_string())),
1638 samples: None,
1639 rate: None,
1640 id: None,
1641 phase: None,
1642 timestamp: Some(1234567890),
1643 timeline: None,
1644 });
1645
1646 let encoded = encode(&msg).unwrap();
1647 let (decoded, _) = decode(&encoded).unwrap();
1648
1649 match decoded {
1650 Message::Publish(pub_msg) => {
1651 assert_eq!(pub_msg.address, "/cue/fire");
1652 assert_eq!(pub_msg.signal, Some(SignalType::Event));
1653 assert_eq!(pub_msg.timestamp, Some(1234567890));
1654 }
1655 _ => panic!("Expected Publish message"),
1656 }
1657 }
1658
1659 #[test]
1660 fn test_subscribe_roundtrip() {
1661 let msg = Message::Subscribe(SubscribeMessage {
1662 id: 42,
1663 pattern: "/lumen/scene/*/layer/**".to_string(),
1664 types: vec![SignalType::Param, SignalType::Stream],
1665 options: Some(SubscribeOptions {
1666 max_rate: Some(60),
1667 epsilon: Some(0.01),
1668 history: None,
1669 window: None,
1670 }),
1671 });
1672
1673 let encoded = encode(&msg).unwrap();
1674 let (decoded, _) = decode(&encoded).unwrap();
1675
1676 match decoded {
1677 Message::Subscribe(sub) => {
1678 assert_eq!(sub.id, 42);
1679 assert_eq!(sub.pattern, "/lumen/scene/*/layer/**");
1680 assert!(sub.types.contains(&SignalType::Param));
1681 assert!(sub.types.contains(&SignalType::Stream));
1682 assert_eq!(sub.options.as_ref().unwrap().max_rate, Some(60));
1683 }
1684 _ => panic!("Expected Subscribe message"),
1685 }
1686 }
1687}