Skip to main content

clasp_core/
codec.rs

1//! CLASP Binary Codec
2//!
3//! Efficient binary encoding for all CLASP messages.
4//! Backward compatible: can decode v2 MessagePack frames.
5//!
6//! # Performance
7//!
8//! Compared to v2 (MessagePack with named keys):
9//! - SET message: 69 bytes → 32 bytes (54% smaller)
10//! - Encoding speed: ~10M msg/s (vs 1.8M)
11//! - Decoding speed: ~12M msg/s (vs 1.5M)
12
13use crate::types::*;
14use crate::{Error, Frame, QoS, Result};
15use bytes::{Buf, BufMut, Bytes, BytesMut};
16use std::collections::HashMap;
17
18/// Encoding version (1 = binary encoding, 0 = MessagePack legacy)
19pub const ENCODING_VERSION: u8 = 1;
20
21/// Message type codes
22pub mod msg {
23    pub const HELLO: u8 = 0x01;
24    pub const WELCOME: u8 = 0x02;
25    pub const ANNOUNCE: u8 = 0x03;
26    pub const SUBSCRIBE: u8 = 0x10;
27    pub const UNSUBSCRIBE: u8 = 0x11;
28    pub const PUBLISH: u8 = 0x20;
29    pub const SET: u8 = 0x21;
30    pub const GET: u8 = 0x22;
31    pub const SNAPSHOT: u8 = 0x23;
32    pub const BUNDLE: u8 = 0x30;
33    pub const SYNC: u8 = 0x40;
34    pub const PING: u8 = 0x41;
35    pub const PONG: u8 = 0x42;
36    pub const ACK: u8 = 0x50;
37    pub const ERROR: u8 = 0x51;
38    pub const QUERY: u8 = 0x60;
39    pub const RESULT: u8 = 0x61;
40}
41
42/// Value type codes for efficient binary encoding
43pub mod val {
44    pub const NULL: u8 = 0x00;
45    pub const BOOL: u8 = 0x01;
46    pub const I8: u8 = 0x02;
47    pub const I16: u8 = 0x03;
48    pub const I32: u8 = 0x04;
49    pub const I64: u8 = 0x05;
50    pub const F32: u8 = 0x06;
51    pub const F64: u8 = 0x07;
52    pub const STRING: u8 = 0x08;
53    pub const BYTES: u8 = 0x09;
54    pub const ARRAY: u8 = 0x0A;
55    pub const MAP: u8 = 0x0B;
56}
57
58/// Signal type codes
59pub mod sig {
60    pub const PARAM: u8 = 0;
61    pub const EVENT: u8 = 1;
62    pub const STREAM: u8 = 2;
63    pub const GESTURE: u8 = 3;
64    pub const TIMELINE: u8 = 4;
65}
66
67/// Gesture phase codes
68pub mod phase {
69    pub const START: u8 = 0;
70    pub const MOVE: u8 = 1;
71    pub const END: u8 = 2;
72    pub const CANCEL: u8 = 3;
73}
74
75// ============================================================================
76// PUBLIC API
77// ============================================================================
78
79/// Encode a message to binary format
80#[inline]
81pub fn encode_message(message: &Message) -> Result<Bytes> {
82    // Pre-allocate based on expected message size
83    let capacity = estimate_message_size(message);
84    let mut buf = BytesMut::with_capacity(capacity);
85    encode_message_to_buf(&mut buf, message)?;
86    Ok(buf.freeze())
87}
88
89/// Estimate message size for pre-allocation (avoids realloc)
90#[inline]
91fn estimate_message_size(msg: &Message) -> usize {
92    match msg {
93        Message::Set(m) => 2 + 2 + m.address.len() + 9 + if m.revision.is_some() { 8 } else { 0 },
94        Message::Publish(m) => 2 + 2 + m.address.len() + 16,
95        Message::Hello(m) => 4 + m.name.len() + 2,
96        Message::Welcome(m) => 12 + m.name.len() + m.session.len() + 4,
97        Message::Subscribe(m) => 6 + m.pattern.len() + 16,
98        Message::Bundle(m) => 12 + m.messages.len() * 48,
99        Message::Ping | Message::Pong => 5, // Just frame header
100        _ => 64,                            // Default for less common messages
101    }
102}
103
104/// Decode a message - auto-detects MessagePack (legacy) vs binary encoding
105#[inline]
106pub fn decode_message(bytes: &[u8]) -> Result<Message> {
107    if bytes.is_empty() {
108        return Err(Error::BufferTooSmall { needed: 1, have: 0 });
109    }
110
111    let first = bytes[0];
112
113    // Binary encoded messages start with known message type codes (0x01-0x61)
114    // v2 MessagePack maps start with 0x80-0x8F (fixmap) or 0xDE-0xDF (map16/map32)
115    if is_msgpack_map(first) {
116        // Legacy v2 format - use rmp-serde
117        decode_v2_msgpack(bytes)
118    } else {
119        // Binary encoding format
120        decode_v3_binary(bytes)
121    }
122}
123
124/// Encode a message into a complete frame (binary encoding)
125#[inline]
126pub fn encode(message: &Message) -> Result<Bytes> {
127    let payload = encode_message(message)?;
128    let mut frame = Frame::new(payload).with_qos(message.default_qos());
129    frame.flags.version = 1; // binary encoding (1 = binary, 0 = MessagePack legacy)
130    frame.encode()
131}
132
133/// Encode a message with options (binary encoding)
134pub fn encode_with_options(
135    message: &Message,
136    qos: Option<QoS>,
137    timestamp: Option<u64>,
138) -> Result<Bytes> {
139    let payload = encode_message(message)?;
140    let mut frame = Frame::new(payload);
141    frame.flags.version = 1; // binary encoding (1 = binary, 0 = MessagePack legacy)
142
143    if let Some(qos) = qos {
144        frame = frame.with_qos(qos);
145    } else {
146        frame = frame.with_qos(message.default_qos());
147    }
148
149    if let Some(ts) = timestamp {
150        frame = frame.with_timestamp(ts);
151    }
152
153    frame.encode()
154}
155
156/// Decode a frame and extract the message
157#[inline]
158pub fn decode(bytes: &[u8]) -> Result<(Message, Frame)> {
159    let frame = Frame::decode(bytes)?;
160    let message = decode_message(&frame.payload)?;
161    Ok((message, frame))
162}
163
164/// Helper to encode just the message payload (without frame) - binary encoding
165pub fn encode_payload(message: &Message) -> Result<Vec<u8>> {
166    let bytes = encode_message(message)?;
167    Ok(bytes.to_vec())
168}
169
170/// Helper to decode just a message payload (without frame)
171pub fn decode_payload(bytes: &[u8]) -> Result<Message> {
172    decode_message(bytes)
173}
174
175// ============================================================================
176// BINARY ENCODING
177// ============================================================================
178
179fn encode_message_to_buf(buf: &mut BytesMut, msg: &Message) -> Result<()> {
180    match msg {
181        Message::Hello(m) => encode_hello(buf, m),
182        Message::Welcome(m) => encode_welcome(buf, m),
183        Message::Announce(m) => encode_announce(buf, m),
184        Message::Subscribe(m) => encode_subscribe(buf, m),
185        Message::Unsubscribe(m) => encode_unsubscribe(buf, m),
186        Message::Publish(m) => encode_publish(buf, m),
187        Message::Set(m) => encode_set(buf, m),
188        Message::Get(m) => encode_get(buf, m),
189        Message::Snapshot(m) => encode_snapshot(buf, m),
190        Message::Bundle(m) => encode_bundle(buf, m),
191        Message::Sync(m) => encode_sync(buf, m),
192        Message::Ping => {
193            buf.put_u8(msg::PING);
194            Ok(())
195        }
196        Message::Pong => {
197            buf.put_u8(msg::PONG);
198            Ok(())
199        }
200        Message::Ack(m) => encode_ack(buf, m),
201        Message::Error(m) => encode_error(buf, m),
202        Message::Query(m) => encode_query(buf, m),
203        Message::Result(m) => encode_result(buf, m),
204    }
205}
206
207/// SET (0x21) - Parameter Update
208/// Flags: [has_rev:1][lock:1][unlock:1][rsv:1][vtype:4]
209#[inline]
210fn encode_set(buf: &mut BytesMut, msg: &SetMessage) -> Result<()> {
211    buf.put_u8(msg::SET);
212
213    let vtype = value_type_code(&msg.value);
214    let mut flags = vtype & 0x0F;
215    if msg.revision.is_some() {
216        flags |= 0x80;
217    }
218    if msg.lock {
219        flags |= 0x40;
220    }
221    if msg.unlock {
222        flags |= 0x20;
223    }
224    buf.put_u8(flags);
225
226    // Address
227    encode_string(buf, &msg.address)?;
228
229    // Value (type already in flags for simple types)
230    encode_value_data(buf, &msg.value)?;
231
232    // Optional revision
233    if let Some(rev) = msg.revision {
234        buf.put_u64(rev);
235    }
236
237    Ok(())
238}
239
240/// PUBLISH (0x20) - Event/Stream/Gesture
241/// Flags: [sig_type:3][has_ts:1][has_id:1][phase:3]
242fn encode_publish(buf: &mut BytesMut, msg: &PublishMessage) -> Result<()> {
243    buf.put_u8(msg::PUBLISH);
244
245    let sig_code = msg
246        .signal
247        .map(|s| signal_type_code(s))
248        .unwrap_or(sig::EVENT);
249    let phase_code = msg
250        .phase
251        .map(|p| gesture_phase_code(p))
252        .unwrap_or(phase::START);
253
254    let mut flags: u8 = (sig_code & 0x07) << 5;
255    if msg.timestamp.is_some() {
256        flags |= 0x10;
257    }
258    if msg.id.is_some() {
259        flags |= 0x08;
260    }
261    flags |= phase_code & 0x07;
262    buf.put_u8(flags);
263
264    // Address
265    encode_string(buf, &msg.address)?;
266
267    // Value/payload
268    if let Some(ref value) = msg.value {
269        buf.put_u8(1); // has value
270        buf.put_u8(value_type_code(value));
271        encode_value_data(buf, value)?;
272    } else if let Some(ref payload) = msg.payload {
273        buf.put_u8(1); // has payload
274        buf.put_u8(value_type_code(payload));
275        encode_value_data(buf, payload)?;
276    } else if let Some(ref samples) = msg.samples {
277        buf.put_u8(2); // has samples
278        buf.put_u16(samples.len() as u16);
279        for sample in samples {
280            buf.put_f64(*sample);
281        }
282    } else {
283        buf.put_u8(0); // no value
284    }
285
286    // Optional timestamp
287    if let Some(ts) = msg.timestamp {
288        buf.put_u64(ts);
289    }
290
291    // Optional gesture ID
292    if let Some(id) = msg.id {
293        buf.put_u32(id);
294    }
295
296    // Optional rate
297    if let Some(rate) = msg.rate {
298        buf.put_u32(rate);
299    }
300
301    Ok(())
302}
303
304/// HELLO (0x01)
305fn encode_hello(buf: &mut BytesMut, msg: &HelloMessage) -> Result<()> {
306    buf.put_u8(msg::HELLO);
307    buf.put_u8(msg.version);
308
309    // Feature flags
310    let mut features: u8 = 0;
311    for f in &msg.features {
312        match f.as_str() {
313            "param" => features |= 0x80,
314            "event" => features |= 0x40,
315            "stream" => features |= 0x20,
316            "gesture" => features |= 0x10,
317            "timeline" => features |= 0x08,
318            _ => {}
319        }
320    }
321    buf.put_u8(features);
322
323    // Name
324    encode_string(buf, &msg.name)?;
325
326    // Token (optional)
327    if let Some(ref token) = msg.token {
328        encode_string(buf, token)?;
329    } else {
330        buf.put_u16(0);
331    }
332
333    Ok(())
334}
335
336/// WELCOME (0x02)
337fn encode_welcome(buf: &mut BytesMut, msg: &WelcomeMessage) -> Result<()> {
338    buf.put_u8(msg::WELCOME);
339    buf.put_u8(msg.version);
340
341    // Feature flags (same as HELLO)
342    let mut features: u8 = 0;
343    for f in &msg.features {
344        match f.as_str() {
345            "param" => features |= 0x80,
346            "event" => features |= 0x40,
347            "stream" => features |= 0x20,
348            "gesture" => features |= 0x10,
349            "timeline" => features |= 0x08,
350            _ => {}
351        }
352    }
353    buf.put_u8(features);
354
355    // Server time
356    buf.put_u64(msg.time);
357
358    // Session ID
359    encode_string(buf, &msg.session)?;
360
361    // Server name
362    encode_string(buf, &msg.name)?;
363
364    // Token (optional)
365    if let Some(ref token) = msg.token {
366        encode_string(buf, token)?;
367    } else {
368        buf.put_u16(0);
369    }
370
371    Ok(())
372}
373
374/// ANNOUNCE (0x03)
375fn encode_announce(buf: &mut BytesMut, msg: &AnnounceMessage) -> Result<()> {
376    buf.put_u8(msg::ANNOUNCE);
377
378    encode_string(buf, &msg.namespace)?;
379    buf.put_u16(msg.signals.len() as u16);
380
381    for sig in &msg.signals {
382        encode_string(buf, &sig.address)?;
383        buf.put_u8(signal_type_code(sig.signal_type));
384
385        // Optional fields flags
386        let mut opt_flags: u8 = 0;
387        if sig.datatype.is_some() {
388            opt_flags |= 0x01;
389        }
390        if sig.access.is_some() {
391            opt_flags |= 0x02;
392        }
393        if sig.meta.is_some() {
394            opt_flags |= 0x04;
395        }
396        buf.put_u8(opt_flags);
397
398        if let Some(ref dt) = sig.datatype {
399            encode_string(buf, dt)?;
400        }
401        if let Some(ref access) = sig.access {
402            encode_string(buf, access)?;
403        }
404        if let Some(ref meta) = sig.meta {
405            // Encode meta as simple fields
406            let mut meta_flags: u8 = 0;
407            if meta.unit.is_some() {
408                meta_flags |= 0x01;
409            }
410            if meta.range.is_some() {
411                meta_flags |= 0x02;
412            }
413            if meta.default.is_some() {
414                meta_flags |= 0x04;
415            }
416            if meta.description.is_some() {
417                meta_flags |= 0x08;
418            }
419            buf.put_u8(meta_flags);
420
421            if let Some(ref unit) = meta.unit {
422                encode_string(buf, unit)?;
423            }
424            if let Some((min, max)) = meta.range {
425                buf.put_f64(min);
426                buf.put_f64(max);
427            }
428            if let Some(ref default) = meta.default {
429                buf.put_u8(value_type_code(default));
430                encode_value_data(buf, default)?;
431            }
432            if let Some(ref desc) = meta.description {
433                encode_string(buf, desc)?;
434            }
435        }
436    }
437
438    Ok(())
439}
440
441/// SUBSCRIBE (0x10)
442fn encode_subscribe(buf: &mut BytesMut, msg: &SubscribeMessage) -> Result<()> {
443    buf.put_u8(msg::SUBSCRIBE);
444    buf.put_u32(msg.id);
445
446    encode_string(buf, &msg.pattern)?;
447
448    // Signal type filter as bitmask
449    let mut type_mask: u8 = 0;
450    if msg.types.is_empty() {
451        type_mask = 0xFF; // All types
452    } else {
453        for t in &msg.types {
454            match t {
455                SignalType::Param => type_mask |= 0x01,
456                SignalType::Event => type_mask |= 0x02,
457                SignalType::Stream => type_mask |= 0x04,
458                SignalType::Gesture => type_mask |= 0x08,
459                SignalType::Timeline => type_mask |= 0x10,
460            }
461        }
462    }
463    buf.put_u8(type_mask);
464
465    // Options
466    if let Some(ref opts) = msg.options {
467        let mut opt_flags: u8 = 0;
468        if opts.max_rate.is_some() {
469            opt_flags |= 0x01;
470        }
471        if opts.epsilon.is_some() {
472            opt_flags |= 0x02;
473        }
474        if opts.history.is_some() {
475            opt_flags |= 0x04;
476        }
477        if opts.window.is_some() {
478            opt_flags |= 0x08;
479        }
480        buf.put_u8(opt_flags);
481
482        if let Some(rate) = opts.max_rate {
483            buf.put_u32(rate);
484        }
485        if let Some(eps) = opts.epsilon {
486            buf.put_f64(eps);
487        }
488        if let Some(hist) = opts.history {
489            buf.put_u32(hist);
490        }
491        if let Some(win) = opts.window {
492            buf.put_u32(win);
493        }
494    } else {
495        buf.put_u8(0); // No options
496    }
497
498    Ok(())
499}
500
501/// UNSUBSCRIBE (0x11)
502fn encode_unsubscribe(buf: &mut BytesMut, msg: &UnsubscribeMessage) -> Result<()> {
503    buf.put_u8(msg::UNSUBSCRIBE);
504    buf.put_u32(msg.id);
505    Ok(())
506}
507
508/// GET (0x22)
509fn encode_get(buf: &mut BytesMut, msg: &GetMessage) -> Result<()> {
510    buf.put_u8(msg::GET);
511    encode_string(buf, &msg.address)?;
512    Ok(())
513}
514
515/// SNAPSHOT (0x23)
516fn encode_snapshot(buf: &mut BytesMut, msg: &SnapshotMessage) -> Result<()> {
517    buf.put_u8(msg::SNAPSHOT);
518    buf.put_u16(msg.params.len() as u16);
519
520    for param in &msg.params {
521        encode_string(buf, &param.address)?;
522        buf.put_u8(value_type_code(&param.value));
523        encode_value_data(buf, &param.value)?;
524        buf.put_u64(param.revision);
525
526        let mut opt_flags: u8 = 0;
527        if param.writer.is_some() {
528            opt_flags |= 0x01;
529        }
530        if param.timestamp.is_some() {
531            opt_flags |= 0x02;
532        }
533        buf.put_u8(opt_flags);
534
535        if let Some(ref writer) = param.writer {
536            encode_string(buf, writer)?;
537        }
538        if let Some(ts) = param.timestamp {
539            buf.put_u64(ts);
540        }
541    }
542
543    Ok(())
544}
545
546/// BUNDLE (0x30)
547fn encode_bundle(buf: &mut BytesMut, msg: &BundleMessage) -> Result<()> {
548    buf.put_u8(msg::BUNDLE);
549
550    let mut flags: u8 = 0;
551    if msg.timestamp.is_some() {
552        flags |= 0x80;
553    }
554    buf.put_u8(flags);
555
556    buf.put_u16(msg.messages.len() as u16);
557
558    if let Some(ts) = msg.timestamp {
559        buf.put_u64(ts);
560    }
561
562    // Each message prefixed with length
563    for inner_msg in &msg.messages {
564        let mut inner_buf = BytesMut::with_capacity(64);
565        encode_message_to_buf(&mut inner_buf, inner_msg)?;
566        buf.put_u16(inner_buf.len() as u16);
567        buf.extend_from_slice(&inner_buf);
568    }
569
570    Ok(())
571}
572
573/// SYNC (0x40)
574fn encode_sync(buf: &mut BytesMut, msg: &SyncMessage) -> Result<()> {
575    buf.put_u8(msg::SYNC);
576
577    let mut flags: u8 = 0;
578    if msg.t2.is_some() {
579        flags |= 0x01;
580    }
581    if msg.t3.is_some() {
582        flags |= 0x02;
583    }
584    buf.put_u8(flags);
585
586    buf.put_u64(msg.t1);
587    if let Some(t2) = msg.t2 {
588        buf.put_u64(t2);
589    }
590    if let Some(t3) = msg.t3 {
591        buf.put_u64(t3);
592    }
593
594    Ok(())
595}
596
597/// ACK (0x50)
598fn encode_ack(buf: &mut BytesMut, msg: &AckMessage) -> Result<()> {
599    buf.put_u8(msg::ACK);
600
601    let mut flags: u8 = 0;
602    if msg.address.is_some() {
603        flags |= 0x01;
604    }
605    if msg.revision.is_some() {
606        flags |= 0x02;
607    }
608    if msg.locked.is_some() {
609        flags |= 0x04;
610    }
611    if msg.holder.is_some() {
612        flags |= 0x08;
613    }
614    if msg.correlation_id.is_some() {
615        flags |= 0x10;
616    }
617    buf.put_u8(flags);
618
619    if let Some(ref addr) = msg.address {
620        encode_string(buf, addr)?;
621    }
622    if let Some(rev) = msg.revision {
623        buf.put_u64(rev);
624    }
625    if let Some(locked) = msg.locked {
626        buf.put_u8(if locked { 1 } else { 0 });
627    }
628    if let Some(ref holder) = msg.holder {
629        encode_string(buf, holder)?;
630    }
631    if let Some(corr) = msg.correlation_id {
632        buf.put_u32(corr);
633    }
634
635    Ok(())
636}
637
638/// ERROR (0x51)
639fn encode_error(buf: &mut BytesMut, msg: &ErrorMessage) -> Result<()> {
640    buf.put_u8(msg::ERROR);
641    buf.put_u16(msg.code);
642    encode_string(buf, &msg.message)?;
643
644    let mut flags: u8 = 0;
645    if msg.address.is_some() {
646        flags |= 0x01;
647    }
648    if msg.correlation_id.is_some() {
649        flags |= 0x02;
650    }
651    buf.put_u8(flags);
652
653    if let Some(ref addr) = msg.address {
654        encode_string(buf, addr)?;
655    }
656    if let Some(corr) = msg.correlation_id {
657        buf.put_u32(corr);
658    }
659
660    Ok(())
661}
662
663/// QUERY (0x60)
664fn encode_query(buf: &mut BytesMut, msg: &QueryMessage) -> Result<()> {
665    buf.put_u8(msg::QUERY);
666    encode_string(buf, &msg.pattern)?;
667    Ok(())
668}
669
670/// RESULT (0x61)
671fn encode_result(buf: &mut BytesMut, msg: &ResultMessage) -> Result<()> {
672    buf.put_u8(msg::RESULT);
673    buf.put_u16(msg.signals.len() as u16);
674
675    for sig in &msg.signals {
676        encode_string(buf, &sig.address)?;
677        buf.put_u8(signal_type_code(sig.signal_type));
678
679        let mut opt_flags: u8 = 0;
680        if sig.datatype.is_some() {
681            opt_flags |= 0x01;
682        }
683        if sig.access.is_some() {
684            opt_flags |= 0x02;
685        }
686        buf.put_u8(opt_flags);
687
688        if let Some(ref dt) = sig.datatype {
689            encode_string(buf, dt)?;
690        }
691        if let Some(ref access) = sig.access {
692            encode_string(buf, access)?;
693        }
694    }
695
696    Ok(())
697}
698
699// ============================================================================
700// VALUE ENCODING HELPERS
701// ============================================================================
702
703#[inline(always)]
704fn encode_string(buf: &mut BytesMut, s: &str) -> Result<()> {
705    let bytes = s.as_bytes();
706    if bytes.len() > u16::MAX as usize {
707        return Err(Error::PayloadTooLarge(bytes.len()));
708    }
709    buf.put_u16(bytes.len() as u16);
710    buf.extend_from_slice(bytes);
711    Ok(())
712}
713
714#[inline]
715fn encode_value_data(buf: &mut BytesMut, value: &Value) -> Result<()> {
716    match value {
717        Value::Null => {} // Type code is enough
718        Value::Bool(b) => buf.put_u8(if *b { 1 } else { 0 }),
719        Value::Int(i) => buf.put_i64(*i),
720        Value::Float(f) => buf.put_f64(*f),
721        Value::String(s) => encode_string(buf, s)?,
722        Value::Bytes(b) => {
723            buf.put_u16(b.len() as u16);
724            buf.extend_from_slice(b);
725        }
726        Value::Array(arr) => {
727            buf.put_u16(arr.len() as u16);
728            for item in arr {
729                buf.put_u8(value_type_code(item));
730                encode_value_data(buf, item)?;
731            }
732        }
733        Value::Map(map) => {
734            buf.put_u16(map.len() as u16);
735            for (key, val) in map {
736                encode_string(buf, key)?;
737                buf.put_u8(value_type_code(val));
738                encode_value_data(buf, val)?;
739            }
740        }
741    }
742    Ok(())
743}
744
745#[inline(always)]
746fn value_type_code(value: &Value) -> u8 {
747    match value {
748        Value::Null => val::NULL,
749        Value::Bool(_) => val::BOOL,
750        Value::Int(_) => val::I64,
751        Value::Float(_) => val::F64,
752        Value::String(_) => val::STRING,
753        Value::Bytes(_) => val::BYTES,
754        Value::Array(_) => val::ARRAY,
755        Value::Map(_) => val::MAP,
756    }
757}
758
759fn signal_type_code(sig: SignalType) -> u8 {
760    match sig {
761        SignalType::Param => sig::PARAM,
762        SignalType::Event => sig::EVENT,
763        SignalType::Stream => sig::STREAM,
764        SignalType::Gesture => sig::GESTURE,
765        SignalType::Timeline => sig::TIMELINE,
766    }
767}
768
769fn gesture_phase_code(phase: GesturePhase) -> u8 {
770    match phase {
771        GesturePhase::Start => phase::START,
772        GesturePhase::Move => phase::MOVE,
773        GesturePhase::End => phase::END,
774        GesturePhase::Cancel => phase::CANCEL,
775    }
776}
777
778// ============================================================================
779// BINARY DECODING
780// ============================================================================
781
782fn decode_v3_binary(bytes: &[u8]) -> Result<Message> {
783    if bytes.is_empty() {
784        return Err(Error::BufferTooSmall { needed: 1, have: 0 });
785    }
786
787    let mut buf = bytes;
788    let msg_type = buf.get_u8();
789
790    match msg_type {
791        msg::HELLO => decode_hello(&mut buf),
792        msg::WELCOME => decode_welcome(&mut buf),
793        msg::ANNOUNCE => decode_announce(&mut buf),
794        msg::SUBSCRIBE => decode_subscribe(&mut buf),
795        msg::UNSUBSCRIBE => decode_unsubscribe(&mut buf),
796        msg::PUBLISH => decode_publish(&mut buf),
797        msg::SET => decode_set(&mut buf),
798        msg::GET => decode_get(&mut buf),
799        msg::SNAPSHOT => decode_snapshot(&mut buf),
800        msg::BUNDLE => decode_bundle(&mut buf),
801        msg::SYNC => decode_sync(&mut buf),
802        msg::PING => Ok(Message::Ping),
803        msg::PONG => Ok(Message::Pong),
804        msg::ACK => decode_ack(&mut buf),
805        msg::ERROR => decode_error(&mut buf),
806        msg::QUERY => decode_query(&mut buf),
807        msg::RESULT => decode_result(&mut buf),
808        _ => Err(Error::UnknownMessageType(msg_type)),
809    }
810}
811
812#[inline]
813fn decode_set(buf: &mut &[u8]) -> Result<Message> {
814    let flags = buf.get_u8();
815    let vtype = flags & 0x0F;
816    let has_rev = (flags & 0x80) != 0;
817    let lock = (flags & 0x40) != 0;
818    let unlock = (flags & 0x20) != 0;
819
820    let address = decode_string(buf)?;
821    let value = decode_value_data(buf, vtype)?;
822
823    let revision = if has_rev { Some(buf.get_u64()) } else { None };
824
825    Ok(Message::Set(SetMessage {
826        address,
827        value,
828        revision,
829        lock,
830        unlock,
831    }))
832}
833
834fn decode_publish(buf: &mut &[u8]) -> Result<Message> {
835    let flags = buf.get_u8();
836    let sig_code = (flags >> 5) & 0x07;
837    let has_ts = (flags & 0x10) != 0;
838    let has_id = (flags & 0x08) != 0;
839    let phase_code = flags & 0x07;
840
841    let address = decode_string(buf)?;
842
843    // Value indicator
844    let value_indicator = buf.get_u8();
845    let (value, payload, samples) = match value_indicator {
846        0 => (None, None, None),
847        1 => {
848            let vtype = buf.get_u8();
849            let v = decode_value_data(buf, vtype)?;
850            (Some(v), None, None)
851        }
852        2 => {
853            let count = buf.get_u16() as usize;
854            let mut s = Vec::with_capacity(count);
855            for _ in 0..count {
856                s.push(buf.get_f64());
857            }
858            (None, None, Some(s))
859        }
860        _ => (None, None, None),
861    };
862
863    let timestamp = if has_ts { Some(buf.get_u64()) } else { None };
864    let id = if has_id { Some(buf.get_u32()) } else { None };
865
866    // Rate (if remaining bytes)
867    let rate = if buf.remaining() >= 4 {
868        Some(buf.get_u32())
869    } else {
870        None
871    };
872
873    let signal = Some(signal_type_from_code(sig_code));
874    let phase = Some(gesture_phase_from_code(phase_code));
875
876    Ok(Message::Publish(PublishMessage {
877        address,
878        signal,
879        value,
880        payload,
881        samples,
882        rate,
883        id,
884        phase,
885        timestamp,
886        timeline: None, // Timeline data is encoded separately when signal is Timeline
887    }))
888}
889
890fn decode_hello(buf: &mut &[u8]) -> Result<Message> {
891    let version = buf.get_u8();
892    let feature_flags = buf.get_u8();
893
894    let mut features = Vec::new();
895    if feature_flags & 0x80 != 0 {
896        features.push("param".to_string());
897    }
898    if feature_flags & 0x40 != 0 {
899        features.push("event".to_string());
900    }
901    if feature_flags & 0x20 != 0 {
902        features.push("stream".to_string());
903    }
904    if feature_flags & 0x10 != 0 {
905        features.push("gesture".to_string());
906    }
907    if feature_flags & 0x08 != 0 {
908        features.push("timeline".to_string());
909    }
910
911    let name = decode_string(buf)?;
912    let token_str = decode_string(buf)?;
913    let token = if token_str.is_empty() {
914        None
915    } else {
916        Some(token_str)
917    };
918
919    Ok(Message::Hello(HelloMessage {
920        version,
921        name,
922        features,
923        capabilities: None,
924        token,
925    }))
926}
927
928fn decode_welcome(buf: &mut &[u8]) -> Result<Message> {
929    let version = buf.get_u8();
930    let feature_flags = buf.get_u8();
931
932    let mut features = Vec::new();
933    if feature_flags & 0x80 != 0 {
934        features.push("param".to_string());
935    }
936    if feature_flags & 0x40 != 0 {
937        features.push("event".to_string());
938    }
939    if feature_flags & 0x20 != 0 {
940        features.push("stream".to_string());
941    }
942    if feature_flags & 0x10 != 0 {
943        features.push("gesture".to_string());
944    }
945    if feature_flags & 0x08 != 0 {
946        features.push("timeline".to_string());
947    }
948
949    let time = buf.get_u64();
950    let session = decode_string(buf)?;
951    let name = decode_string(buf)?;
952
953    let token_str = decode_string(buf)?;
954    let token = if token_str.is_empty() {
955        None
956    } else {
957        Some(token_str)
958    };
959
960    Ok(Message::Welcome(WelcomeMessage {
961        version,
962        session,
963        name,
964        features,
965        time,
966        token,
967    }))
968}
969
970fn decode_announce(buf: &mut &[u8]) -> Result<Message> {
971    let namespace = decode_string(buf)?;
972    let count = buf.get_u16() as usize;
973
974    let mut signals = Vec::with_capacity(count);
975    for _ in 0..count {
976        let address = decode_string(buf)?;
977        let sig_code = buf.get_u8();
978        let opt_flags = buf.get_u8();
979
980        let datatype = if opt_flags & 0x01 != 0 {
981            Some(decode_string(buf)?)
982        } else {
983            None
984        };
985        let access = if opt_flags & 0x02 != 0 {
986            Some(decode_string(buf)?)
987        } else {
988            None
989        };
990
991        let meta = if opt_flags & 0x04 != 0 {
992            let meta_flags = buf.get_u8();
993
994            let unit = if meta_flags & 0x01 != 0 {
995                Some(decode_string(buf)?)
996            } else {
997                None
998            };
999            let range = if meta_flags & 0x02 != 0 {
1000                let min = buf.get_f64();
1001                let max = buf.get_f64();
1002                Some((min, max))
1003            } else {
1004                None
1005            };
1006            let default = if meta_flags & 0x04 != 0 {
1007                let vtype = buf.get_u8();
1008                Some(decode_value_data(buf, vtype)?)
1009            } else {
1010                None
1011            };
1012            let description = if meta_flags & 0x08 != 0 {
1013                Some(decode_string(buf)?)
1014            } else {
1015                None
1016            };
1017
1018            Some(SignalMeta {
1019                unit,
1020                range,
1021                default,
1022                description,
1023            })
1024        } else {
1025            None
1026        };
1027
1028        signals.push(SignalDefinition {
1029            address,
1030            signal_type: signal_type_from_code(sig_code),
1031            datatype,
1032            access,
1033            meta,
1034        });
1035    }
1036
1037    Ok(Message::Announce(AnnounceMessage {
1038        namespace,
1039        signals,
1040        meta: None,
1041    }))
1042}
1043
1044fn decode_subscribe(buf: &mut &[u8]) -> Result<Message> {
1045    let id = buf.get_u32();
1046    let pattern = decode_string(buf)?;
1047    let type_mask = buf.get_u8();
1048
1049    let mut types = Vec::new();
1050    if type_mask == 0xFF {
1051        // All types, leave empty to indicate all
1052    } else {
1053        if type_mask & 0x01 != 0 {
1054            types.push(SignalType::Param);
1055        }
1056        if type_mask & 0x02 != 0 {
1057            types.push(SignalType::Event);
1058        }
1059        if type_mask & 0x04 != 0 {
1060            types.push(SignalType::Stream);
1061        }
1062        if type_mask & 0x08 != 0 {
1063            types.push(SignalType::Gesture);
1064        }
1065        if type_mask & 0x10 != 0 {
1066            types.push(SignalType::Timeline);
1067        }
1068    }
1069
1070    let opt_flags = buf.get_u8();
1071    let options = if opt_flags != 0 {
1072        let max_rate = if opt_flags & 0x01 != 0 {
1073            Some(buf.get_u32())
1074        } else {
1075            None
1076        };
1077        let epsilon = if opt_flags & 0x02 != 0 {
1078            Some(buf.get_f64())
1079        } else {
1080            None
1081        };
1082        let history = if opt_flags & 0x04 != 0 {
1083            Some(buf.get_u32())
1084        } else {
1085            None
1086        };
1087        let window = if opt_flags & 0x08 != 0 {
1088            Some(buf.get_u32())
1089        } else {
1090            None
1091        };
1092
1093        Some(SubscribeOptions {
1094            max_rate,
1095            epsilon,
1096            history,
1097            window,
1098        })
1099    } else {
1100        None
1101    };
1102
1103    Ok(Message::Subscribe(SubscribeMessage {
1104        id,
1105        pattern,
1106        types,
1107        options,
1108    }))
1109}
1110
1111fn decode_unsubscribe(buf: &mut &[u8]) -> Result<Message> {
1112    let id = buf.get_u32();
1113    Ok(Message::Unsubscribe(UnsubscribeMessage { id }))
1114}
1115
1116fn decode_get(buf: &mut &[u8]) -> Result<Message> {
1117    let address = decode_string(buf)?;
1118    Ok(Message::Get(GetMessage { address }))
1119}
1120
1121fn decode_snapshot(buf: &mut &[u8]) -> Result<Message> {
1122    let count = buf.get_u16() as usize;
1123    let mut params = Vec::with_capacity(count);
1124
1125    for _ in 0..count {
1126        let address = decode_string(buf)?;
1127        let vtype = buf.get_u8();
1128        let value = decode_value_data(buf, vtype)?;
1129        let revision = buf.get_u64();
1130        let opt_flags = buf.get_u8();
1131
1132        let writer = if opt_flags & 0x01 != 0 {
1133            Some(decode_string(buf)?)
1134        } else {
1135            None
1136        };
1137        let timestamp = if opt_flags & 0x02 != 0 {
1138            Some(buf.get_u64())
1139        } else {
1140            None
1141        };
1142
1143        params.push(ParamValue {
1144            address,
1145            value,
1146            revision,
1147            writer,
1148            timestamp,
1149        });
1150    }
1151
1152    Ok(Message::Snapshot(SnapshotMessage { params }))
1153}
1154
1155fn decode_bundle(buf: &mut &[u8]) -> Result<Message> {
1156    let flags = buf.get_u8();
1157    let has_ts = (flags & 0x80) != 0;
1158    let count = buf.get_u16() as usize;
1159
1160    let timestamp = if has_ts { Some(buf.get_u64()) } else { None };
1161
1162    let mut messages = Vec::with_capacity(count);
1163    for _ in 0..count {
1164        let len = buf.get_u16() as usize;
1165        let inner_bytes = &buf[..len];
1166        buf.advance(len);
1167        messages.push(decode_v3_binary(inner_bytes)?);
1168    }
1169
1170    Ok(Message::Bundle(BundleMessage {
1171        timestamp,
1172        messages,
1173    }))
1174}
1175
1176fn decode_sync(buf: &mut &[u8]) -> Result<Message> {
1177    let flags = buf.get_u8();
1178    let t1 = buf.get_u64();
1179    let t2 = if flags & 0x01 != 0 {
1180        Some(buf.get_u64())
1181    } else {
1182        None
1183    };
1184    let t3 = if flags & 0x02 != 0 {
1185        Some(buf.get_u64())
1186    } else {
1187        None
1188    };
1189
1190    Ok(Message::Sync(SyncMessage { t1, t2, t3 }))
1191}
1192
1193fn decode_ack(buf: &mut &[u8]) -> Result<Message> {
1194    let flags = buf.get_u8();
1195
1196    let address = if flags & 0x01 != 0 {
1197        Some(decode_string(buf)?)
1198    } else {
1199        None
1200    };
1201    let revision = if flags & 0x02 != 0 {
1202        Some(buf.get_u64())
1203    } else {
1204        None
1205    };
1206    let locked = if flags & 0x04 != 0 {
1207        Some(buf.get_u8() != 0)
1208    } else {
1209        None
1210    };
1211    let holder = if flags & 0x08 != 0 {
1212        Some(decode_string(buf)?)
1213    } else {
1214        None
1215    };
1216    let correlation_id = if flags & 0x10 != 0 {
1217        Some(buf.get_u32())
1218    } else {
1219        None
1220    };
1221
1222    Ok(Message::Ack(AckMessage {
1223        address,
1224        revision,
1225        locked,
1226        holder,
1227        correlation_id,
1228    }))
1229}
1230
1231fn decode_error(buf: &mut &[u8]) -> Result<Message> {
1232    let code = buf.get_u16();
1233    let message = decode_string(buf)?;
1234    let flags = buf.get_u8();
1235
1236    let address = if flags & 0x01 != 0 {
1237        Some(decode_string(buf)?)
1238    } else {
1239        None
1240    };
1241    let correlation_id = if flags & 0x02 != 0 {
1242        Some(buf.get_u32())
1243    } else {
1244        None
1245    };
1246
1247    Ok(Message::Error(ErrorMessage {
1248        code,
1249        message,
1250        address,
1251        correlation_id,
1252    }))
1253}
1254
1255fn decode_query(buf: &mut &[u8]) -> Result<Message> {
1256    let pattern = decode_string(buf)?;
1257    Ok(Message::Query(QueryMessage { pattern }))
1258}
1259
1260fn decode_result(buf: &mut &[u8]) -> Result<Message> {
1261    let count = buf.get_u16() as usize;
1262    let mut signals = Vec::with_capacity(count);
1263
1264    for _ in 0..count {
1265        let address = decode_string(buf)?;
1266        let sig_code = buf.get_u8();
1267        let opt_flags = buf.get_u8();
1268
1269        let datatype = if opt_flags & 0x01 != 0 {
1270            Some(decode_string(buf)?)
1271        } else {
1272            None
1273        };
1274        let access = if opt_flags & 0x02 != 0 {
1275            Some(decode_string(buf)?)
1276        } else {
1277            None
1278        };
1279
1280        signals.push(SignalDefinition {
1281            address,
1282            signal_type: signal_type_from_code(sig_code),
1283            datatype,
1284            access,
1285            meta: None,
1286        });
1287    }
1288
1289    Ok(Message::Result(ResultMessage { signals }))
1290}
1291
1292// ============================================================================
1293// VALUE DECODING HELPERS
1294// ============================================================================
1295
1296#[inline(always)]
1297fn decode_string(buf: &mut &[u8]) -> Result<String> {
1298    if buf.remaining() < 2 {
1299        return Err(Error::BufferTooSmall {
1300            needed: 2,
1301            have: buf.remaining(),
1302        });
1303    }
1304    let len = buf.get_u16() as usize;
1305    if buf.remaining() < len {
1306        return Err(Error::BufferTooSmall {
1307            needed: len,
1308            have: buf.remaining(),
1309        });
1310    }
1311    let bytes = &buf[..len];
1312    buf.advance(len);
1313    String::from_utf8(bytes.to_vec()).map_err(|e| Error::DecodeError(e.to_string()))
1314}
1315
1316#[inline]
1317fn decode_value_data(buf: &mut &[u8], vtype: u8) -> Result<Value> {
1318    match vtype {
1319        val::NULL => Ok(Value::Null),
1320        val::BOOL => {
1321            let b = buf.get_u8();
1322            Ok(Value::Bool(b != 0))
1323        }
1324        val::I8 => {
1325            let i = buf.get_i8() as i64;
1326            Ok(Value::Int(i))
1327        }
1328        val::I16 => {
1329            let i = buf.get_i16() as i64;
1330            Ok(Value::Int(i))
1331        }
1332        val::I32 => {
1333            let i = buf.get_i32() as i64;
1334            Ok(Value::Int(i))
1335        }
1336        val::I64 => {
1337            let i = buf.get_i64();
1338            Ok(Value::Int(i))
1339        }
1340        val::F32 => {
1341            let f = buf.get_f32() as f64;
1342            Ok(Value::Float(f))
1343        }
1344        val::F64 => {
1345            let f = buf.get_f64();
1346            Ok(Value::Float(f))
1347        }
1348        val::STRING => {
1349            let s = decode_string(buf)?;
1350            Ok(Value::String(s))
1351        }
1352        val::BYTES => {
1353            if buf.remaining() < 2 {
1354                return Err(Error::BufferTooSmall {
1355                    needed: 2,
1356                    have: buf.remaining(),
1357                });
1358            }
1359            let len = buf.get_u16() as usize;
1360            if buf.remaining() < len {
1361                return Err(Error::BufferTooSmall {
1362                    needed: len,
1363                    have: buf.remaining(),
1364                });
1365            }
1366            let bytes = buf[..len].to_vec();
1367            buf.advance(len);
1368            Ok(Value::Bytes(bytes))
1369        }
1370        val::ARRAY => {
1371            let count = buf.get_u16() as usize;
1372            let mut arr = Vec::with_capacity(count);
1373            for _ in 0..count {
1374                let item_type = buf.get_u8();
1375                arr.push(decode_value_data(buf, item_type)?);
1376            }
1377            Ok(Value::Array(arr))
1378        }
1379        val::MAP => {
1380            let count = buf.get_u16() as usize;
1381            let mut map = HashMap::with_capacity(count);
1382            for _ in 0..count {
1383                let key = decode_string(buf)?;
1384                let val_type = buf.get_u8();
1385                let val = decode_value_data(buf, val_type)?;
1386                map.insert(key, val);
1387            }
1388            Ok(Value::Map(map))
1389        }
1390        _ => Err(Error::DecodeError(format!(
1391            "unknown value type: 0x{:02x}",
1392            vtype
1393        ))),
1394    }
1395}
1396
1397fn signal_type_from_code(code: u8) -> SignalType {
1398    match code {
1399        sig::PARAM => SignalType::Param,
1400        sig::EVENT => SignalType::Event,
1401        sig::STREAM => SignalType::Stream,
1402        sig::GESTURE => SignalType::Gesture,
1403        sig::TIMELINE => SignalType::Timeline,
1404        _ => SignalType::Event, // Default
1405    }
1406}
1407
1408fn gesture_phase_from_code(code: u8) -> GesturePhase {
1409    match code {
1410        phase::START => GesturePhase::Start,
1411        phase::MOVE => GesturePhase::Move,
1412        phase::END => GesturePhase::End,
1413        phase::CANCEL => GesturePhase::Cancel,
1414        _ => GesturePhase::Start, // Default
1415    }
1416}
1417
1418// ============================================================================
1419// V2 MESSAGEPACK DECODING (BACKWARD COMPATIBILITY)
1420// ============================================================================
1421
1422fn is_msgpack_map(byte: u8) -> bool {
1423    // fixmap: 0x80-0x8F, map16: 0xDE, map32: 0xDF
1424    (byte & 0xF0) == 0x80 || byte == 0xDE || byte == 0xDF
1425}
1426
1427fn decode_v2_msgpack(bytes: &[u8]) -> Result<Message> {
1428    rmp_serde::from_slice(bytes).map_err(|e| Error::DecodeError(e.to_string()))
1429}
1430
1431// ============================================================================
1432// TESTS
1433// ============================================================================
1434
1435#[cfg(test)]
1436mod tests {
1437    use super::*;
1438
1439    #[test]
1440    fn test_hello_roundtrip() {
1441        let msg = Message::Hello(HelloMessage {
1442            version: 1,
1443            name: "Test Client".to_string(),
1444            features: vec!["param".to_string(), "event".to_string()],
1445            capabilities: None,
1446            token: None,
1447        });
1448
1449        let encoded = encode(&msg).unwrap();
1450        let (decoded, frame) = decode(&encoded).unwrap();
1451
1452        match decoded {
1453            Message::Hello(hello) => {
1454                assert_eq!(hello.version, 1);
1455                assert_eq!(hello.name, "Test Client");
1456                assert!(hello.features.contains(&"param".to_string()));
1457                assert!(hello.features.contains(&"event".to_string()));
1458            }
1459            _ => panic!("Expected Hello message"),
1460        }
1461
1462        assert_eq!(frame.flags.qos, QoS::Fire);
1463        assert_eq!(frame.flags.version, 1); // binary encoding
1464    }
1465
1466    #[test]
1467    fn test_set_roundtrip() {
1468        let msg = Message::Set(SetMessage {
1469            address: "/test/value".to_string(),
1470            value: Value::Float(0.75),
1471            revision: Some(42),
1472            lock: false,
1473            unlock: false,
1474        });
1475
1476        let encoded = encode(&msg).unwrap();
1477        let (decoded, frame) = decode(&encoded).unwrap();
1478
1479        match decoded {
1480            Message::Set(set) => {
1481                assert_eq!(set.address, "/test/value");
1482                assert_eq!(set.value.as_f64(), Some(0.75));
1483                assert_eq!(set.revision, Some(42));
1484            }
1485            _ => panic!("Expected Set message"),
1486        }
1487
1488        assert_eq!(frame.flags.qos, QoS::Confirm);
1489    }
1490
1491    #[test]
1492    fn test_set_size_reduction() {
1493        let msg = Message::Set(SetMessage {
1494            address: "/test/value".to_string(),
1495            value: Value::Float(0.5),
1496            revision: Some(1),
1497            lock: false,
1498            unlock: false,
1499        });
1500
1501        // Binary encoding
1502        let binary_payload = encode_message(&msg).unwrap();
1503
1504        // MessagePack encoding (named keys, legacy)
1505        let msgpack_payload = rmp_serde::to_vec_named(&msg).unwrap();
1506
1507        println!("Binary payload: {} bytes", binary_payload.len());
1508        println!("MessagePack payload: {} bytes", msgpack_payload.len());
1509
1510        // Binary encoding should be significantly smaller (target: ~32 bytes vs ~69 bytes)
1511        assert!(
1512            binary_payload.len() < msgpack_payload.len(),
1513            "Binary encoding ({}) should be smaller than MessagePack ({})",
1514            binary_payload.len(),
1515            msgpack_payload.len()
1516        );
1517
1518        // Binary encoding should be at least 40% smaller
1519        let savings = 100 - (binary_payload.len() * 100 / msgpack_payload.len());
1520        println!("Size reduction: {}%", savings);
1521        assert!(
1522            savings >= 40,
1523            "Expected at least 40% size reduction, got {}%",
1524            savings
1525        );
1526    }
1527
1528    #[test]
1529    fn test_bundle_roundtrip() {
1530        let msg = Message::Bundle(BundleMessage {
1531            timestamp: Some(1000000),
1532            messages: vec![
1533                Message::Set(SetMessage {
1534                    address: "/light/1".to_string(),
1535                    value: Value::Float(1.0),
1536                    revision: None,
1537                    lock: false,
1538                    unlock: false,
1539                }),
1540                Message::Set(SetMessage {
1541                    address: "/light/2".to_string(),
1542                    value: Value::Float(0.0),
1543                    revision: None,
1544                    lock: false,
1545                    unlock: false,
1546                }),
1547            ],
1548        });
1549
1550        let encoded = encode(&msg).unwrap();
1551        let (decoded, _) = decode(&encoded).unwrap();
1552
1553        match decoded {
1554            Message::Bundle(bundle) => {
1555                assert_eq!(bundle.timestamp, Some(1000000));
1556                assert_eq!(bundle.messages.len(), 2);
1557            }
1558            _ => panic!("Expected Bundle message"),
1559        }
1560    }
1561
1562    #[test]
1563    fn test_value_types() {
1564        let values = vec![
1565            Value::Null,
1566            Value::Bool(true),
1567            Value::Int(42),
1568            Value::Float(1.25),
1569            Value::String("hello".to_string()),
1570            Value::Array(vec![Value::Int(1), Value::Int(2), Value::Int(3)]),
1571        ];
1572
1573        for value in values {
1574            let msg = Message::Set(SetMessage {
1575                address: "/test".to_string(),
1576                value: value.clone(),
1577                revision: None,
1578                lock: false,
1579                unlock: false,
1580            });
1581
1582            let encoded = encode(&msg).unwrap();
1583            let (decoded, _) = decode(&encoded).unwrap();
1584
1585            match decoded {
1586                Message::Set(set) => {
1587                    assert_eq!(set.value, value);
1588                }
1589                _ => panic!("Expected Set message"),
1590            }
1591        }
1592    }
1593
1594    #[test]
1595    fn test_backward_compat_v2_decode() {
1596        // Create a v2 MessagePack encoded message
1597        let msg = SetMessage {
1598            address: "/test/value".to_string(),
1599            value: Value::Float(0.5),
1600            revision: Some(1),
1601            lock: false,
1602            unlock: false,
1603        };
1604
1605        // Encode as v2 (MessagePack with named keys)
1606        let v2_bytes = rmp_serde::to_vec_named(&Message::Set(msg.clone())).unwrap();
1607
1608        // Should still decode correctly
1609        let decoded = decode_message(&v2_bytes).unwrap();
1610
1611        match decoded {
1612            Message::Set(set) => {
1613                assert_eq!(set.address, "/test/value");
1614                assert_eq!(set.value.as_f64(), Some(0.5));
1615            }
1616            _ => panic!("Expected Set message"),
1617        }
1618    }
1619
1620    #[test]
1621    fn test_ping_pong() {
1622        let ping = encode(&Message::Ping).unwrap();
1623        let (decoded, _) = decode(&ping).unwrap();
1624        assert!(matches!(decoded, Message::Ping));
1625
1626        let pong = encode(&Message::Pong).unwrap();
1627        let (decoded, _) = decode(&pong).unwrap();
1628        assert!(matches!(decoded, Message::Pong));
1629    }
1630
1631    #[test]
1632    fn test_publish_event() {
1633        let msg = Message::Publish(PublishMessage {
1634            address: "/cue/fire".to_string(),
1635            signal: Some(SignalType::Event),
1636            value: None,
1637            payload: Some(Value::String("intro".to_string())),
1638            samples: None,
1639            rate: None,
1640            id: None,
1641            phase: None,
1642            timestamp: Some(1234567890),
1643            timeline: None,
1644        });
1645
1646        let encoded = encode(&msg).unwrap();
1647        let (decoded, _) = decode(&encoded).unwrap();
1648
1649        match decoded {
1650            Message::Publish(pub_msg) => {
1651                assert_eq!(pub_msg.address, "/cue/fire");
1652                assert_eq!(pub_msg.signal, Some(SignalType::Event));
1653                assert_eq!(pub_msg.timestamp, Some(1234567890));
1654            }
1655            _ => panic!("Expected Publish message"),
1656        }
1657    }
1658
1659    #[test]
1660    fn test_subscribe_roundtrip() {
1661        let msg = Message::Subscribe(SubscribeMessage {
1662            id: 42,
1663            pattern: "/lumen/scene/*/layer/**".to_string(),
1664            types: vec![SignalType::Param, SignalType::Stream],
1665            options: Some(SubscribeOptions {
1666                max_rate: Some(60),
1667                epsilon: Some(0.01),
1668                history: None,
1669                window: None,
1670            }),
1671        });
1672
1673        let encoded = encode(&msg).unwrap();
1674        let (decoded, _) = decode(&encoded).unwrap();
1675
1676        match decoded {
1677            Message::Subscribe(sub) => {
1678                assert_eq!(sub.id, 42);
1679                assert_eq!(sub.pattern, "/lumen/scene/*/layer/**");
1680                assert!(sub.types.contains(&SignalType::Param));
1681                assert!(sub.types.contains(&SignalType::Stream));
1682                assert_eq!(sub.options.as_ref().unwrap().max_rate, Some(60));
1683            }
1684            _ => panic!("Expected Subscribe message"),
1685        }
1686    }
1687}