Skip to main content

clasp_core/
codec.rs

1//! CLASP Binary Codec
2//!
3//! Efficient binary encoding for all CLASP messages.
4//! Backward compatible: can decode v2 MessagePack frames.
5//!
6//! # Performance
7//!
8//! Compared to v2 (MessagePack with named keys):
9//! - SET message: 69 bytes → 32 bytes (54% smaller)
10//! - Encoding speed: ~10M msg/s (vs 1.8M)
11//! - Decoding speed: ~12M msg/s (vs 1.5M)
12
13use crate::types::*;
14use crate::{Error, Frame, QoS, Result};
15use bytes::{Buf, BufMut, Bytes, BytesMut};
16use std::collections::HashMap;
17
18/// Encoding version (1 = binary encoding, 0 = MessagePack legacy)
19pub const ENCODING_VERSION: u8 = 1;
20
21/// Message type codes
22pub mod msg {
23    pub const HELLO: u8 = 0x01;
24    pub const WELCOME: u8 = 0x02;
25    pub const ANNOUNCE: u8 = 0x03;
26    pub const SUBSCRIBE: u8 = 0x10;
27    pub const UNSUBSCRIBE: u8 = 0x11;
28    pub const PUBLISH: u8 = 0x20;
29    pub const SET: u8 = 0x21;
30    pub const GET: u8 = 0x22;
31    pub const SNAPSHOT: u8 = 0x23;
32    pub const FEDERATION_SYNC: u8 = 0x04;
33    pub const REPLAY: u8 = 0x24;
34    pub const BUNDLE: u8 = 0x30;
35    pub const SYNC: u8 = 0x40;
36    pub const PING: u8 = 0x41;
37    pub const PONG: u8 = 0x42;
38    pub const ACK: u8 = 0x50;
39    pub const ERROR: u8 = 0x51;
40    pub const QUERY: u8 = 0x60;
41    pub const RESULT: u8 = 0x61;
42}
43
44/// Value type codes for efficient binary encoding
45pub mod val {
46    pub const NULL: u8 = 0x00;
47    pub const BOOL: u8 = 0x01;
48    pub const I8: u8 = 0x02;
49    pub const I16: u8 = 0x03;
50    pub const I32: u8 = 0x04;
51    pub const I64: u8 = 0x05;
52    pub const F32: u8 = 0x06;
53    pub const F64: u8 = 0x07;
54    pub const STRING: u8 = 0x08;
55    pub const BYTES: u8 = 0x09;
56    pub const ARRAY: u8 = 0x0A;
57    pub const MAP: u8 = 0x0B;
58}
59
60/// Signal type codes
61pub mod sig {
62    pub const PARAM: u8 = 0;
63    pub const EVENT: u8 = 1;
64    pub const STREAM: u8 = 2;
65    pub const GESTURE: u8 = 3;
66    pub const TIMELINE: u8 = 4;
67}
68
69/// Gesture phase codes
70pub mod phase {
71    pub const START: u8 = 0;
72    pub const MOVE: u8 = 1;
73    pub const END: u8 = 2;
74    pub const CANCEL: u8 = 3;
75}
76
77// ============================================================================
78// PUBLIC API
79// ============================================================================
80
81/// Encode a message to binary format
82#[inline]
83pub fn encode_message(message: &Message) -> Result<Bytes> {
84    // Pre-allocate based on expected message size
85    let capacity = estimate_message_size(message);
86    let mut buf = BytesMut::with_capacity(capacity);
87    encode_message_to_buf(&mut buf, message)?;
88    Ok(buf.freeze())
89}
90
91/// Estimate message size for pre-allocation (avoids realloc)
92#[inline]
93fn estimate_message_size(msg: &Message) -> usize {
94    match msg {
95        Message::Set(m) => 2 + 2 + m.address.len() + 9 + if m.revision.is_some() { 8 } else { 0 } + if m.ttl.is_some() { 4 } else { 0 },
96        Message::Publish(m) => 2 + 2 + m.address.len() + 16,
97        Message::Hello(m) => 4 + m.name.len() + 2,
98        Message::Welcome(m) => 12 + m.name.len() + m.session.len() + 4,
99        Message::Subscribe(m) => 6 + m.pattern.len() + 16,
100        Message::Bundle(m) => 12 + m.messages.len() * 48,
101        Message::Replay(m) => 4 + m.pattern.len() + 20,
102        Message::FederationSync(m) => {
103            4 + m.patterns.iter().map(|p| 2 + p.len()).sum::<usize>() + m.revisions.len() * 12 + 16
104        }
105        Message::Ping | Message::Pong => 5, // Just frame header
106        _ => 64,                            // Default for less common messages
107    }
108}
109
110/// Decode a message - auto-detects MessagePack (legacy) vs binary encoding
111#[inline]
112pub fn decode_message(bytes: &[u8]) -> Result<Message> {
113    if bytes.is_empty() {
114        return Err(Error::BufferTooSmall { needed: 1, have: 0 });
115    }
116
117    let first = bytes[0];
118
119    // Binary encoded messages start with known message type codes (0x01-0x61)
120    // v2 MessagePack maps start with 0x80-0x8F (fixmap) or 0xDE-0xDF (map16/map32)
121    if is_msgpack_map(first) {
122        // Legacy v2 format - use rmp-serde
123        decode_v2_msgpack(bytes)
124    } else {
125        // Binary encoding format
126        decode_v3_binary(bytes)
127    }
128}
129
130/// Encode a message into a complete frame (binary encoding)
131#[inline]
132pub fn encode(message: &Message) -> Result<Bytes> {
133    let payload = encode_message(message)?;
134    let mut frame = Frame::new(payload).with_qos(message.default_qos());
135    frame.flags.version = 1; // binary encoding (1 = binary, 0 = MessagePack legacy)
136    frame.encode()
137}
138
139/// Encode a message with options (binary encoding)
140pub fn encode_with_options(
141    message: &Message,
142    qos: Option<QoS>,
143    timestamp: Option<u64>,
144) -> Result<Bytes> {
145    let payload = encode_message(message)?;
146    let mut frame = Frame::new(payload);
147    frame.flags.version = 1; // binary encoding (1 = binary, 0 = MessagePack legacy)
148
149    if let Some(qos) = qos {
150        frame = frame.with_qos(qos);
151    } else {
152        frame = frame.with_qos(message.default_qos());
153    }
154
155    if let Some(ts) = timestamp {
156        frame = frame.with_timestamp(ts);
157    }
158
159    frame.encode()
160}
161
162/// Decode a frame and extract the message
163#[inline]
164pub fn decode(bytes: &[u8]) -> Result<(Message, Frame)> {
165    let frame = Frame::decode(bytes)?;
166    let message = decode_message(&frame.payload)?;
167    Ok((message, frame))
168}
169
170/// Helper to encode just the message payload (without frame) - binary encoding
171pub fn encode_payload(message: &Message) -> Result<Vec<u8>> {
172    let bytes = encode_message(message)?;
173    Ok(bytes.to_vec())
174}
175
176/// Helper to decode just a message payload (without frame)
177pub fn decode_payload(bytes: &[u8]) -> Result<Message> {
178    decode_message(bytes)
179}
180
181// ============================================================================
182// BINARY ENCODING
183// ============================================================================
184
185fn encode_message_to_buf(buf: &mut BytesMut, msg: &Message) -> Result<()> {
186    match msg {
187        Message::Hello(m) => encode_hello(buf, m),
188        Message::Welcome(m) => encode_welcome(buf, m),
189        Message::Announce(m) => encode_announce(buf, m),
190        Message::Subscribe(m) => encode_subscribe(buf, m),
191        Message::Unsubscribe(m) => encode_unsubscribe(buf, m),
192        Message::Publish(m) => encode_publish(buf, m),
193        Message::Set(m) => encode_set(buf, m),
194        Message::Get(m) => encode_get(buf, m),
195        Message::Snapshot(m) => encode_snapshot(buf, m),
196        Message::Replay(m) => encode_replay(buf, m),
197        Message::FederationSync(m) => encode_federation_sync(buf, m),
198        Message::Bundle(m) => encode_bundle(buf, m),
199        Message::Sync(m) => encode_sync(buf, m),
200        Message::Ping => {
201            buf.put_u8(msg::PING);
202            Ok(())
203        }
204        Message::Pong => {
205            buf.put_u8(msg::PONG);
206            Ok(())
207        }
208        Message::Ack(m) => encode_ack(buf, m),
209        Message::Error(m) => encode_error(buf, m),
210        Message::Query(m) => encode_query(buf, m),
211        Message::Result(m) => encode_result(buf, m),
212    }
213}
214
215/// SET (0x21) - Parameter Update
216/// Flags: [has_rev:1][lock:1][unlock:1][rsv:1][vtype:4]
217#[inline]
218fn encode_set(buf: &mut BytesMut, msg: &SetMessage) -> Result<()> {
219    buf.put_u8(msg::SET);
220
221    let vtype = value_type_code(&msg.value);
222    let mut flags = vtype & 0x0F;
223    if msg.revision.is_some() {
224        flags |= 0x80;
225    }
226    if msg.lock {
227        flags |= 0x40;
228    }
229    if msg.unlock {
230        flags |= 0x20;
231    }
232    if msg.ttl.is_some() {
233        flags |= 0x10;
234    }
235    buf.put_u8(flags);
236
237    // Address
238    encode_string(buf, &msg.address)?;
239
240    // Value (type already in flags for simple types)
241    encode_value_data(buf, &msg.value)?;
242
243    // Optional revision
244    if let Some(rev) = msg.revision {
245        buf.put_u64(rev);
246    }
247
248    // Optional TTL
249    if let Some(ttl) = msg.ttl {
250        let raw = match ttl {
251            Ttl::Never => 0u32,
252            Ttl::Sliding(secs) => secs & 0x7FFF_FFFF,
253            Ttl::Absolute(secs) => (secs & 0x7FFF_FFFF) | 0x8000_0000,
254        };
255        buf.put_u32(raw);
256    }
257
258    Ok(())
259}
260
261/// PUBLISH (0x20) - Event/Stream/Gesture
262/// Flags: [sig_type:3][has_ts:1][has_id:1][phase:3]
263fn encode_publish(buf: &mut BytesMut, msg: &PublishMessage) -> Result<()> {
264    buf.put_u8(msg::PUBLISH);
265
266    let sig_code = msg.signal.map(signal_type_code).unwrap_or(sig::EVENT);
267    let phase_code = msg.phase.map(gesture_phase_code).unwrap_or(phase::START);
268
269    let mut flags: u8 = (sig_code & 0x07) << 5;
270    if msg.timestamp.is_some() {
271        flags |= 0x10;
272    }
273    if msg.id.is_some() {
274        flags |= 0x08;
275    }
276    flags |= phase_code & 0x07;
277    buf.put_u8(flags);
278
279    // Address
280    encode_string(buf, &msg.address)?;
281
282    // Value/payload
283    if let Some(ref value) = msg.value {
284        buf.put_u8(1); // has value
285        buf.put_u8(value_type_code(value));
286        encode_value_data(buf, value)?;
287    } else if let Some(ref payload) = msg.payload {
288        buf.put_u8(1); // has payload
289        buf.put_u8(value_type_code(payload));
290        encode_value_data(buf, payload)?;
291    } else if let Some(ref samples) = msg.samples {
292        buf.put_u8(2); // has samples
293        buf.put_u16(samples.len() as u16);
294        for sample in samples {
295            buf.put_f64(*sample);
296        }
297    } else {
298        buf.put_u8(0); // no value
299    }
300
301    // Optional timestamp
302    if let Some(ts) = msg.timestamp {
303        buf.put_u64(ts);
304    }
305
306    // Optional gesture ID
307    if let Some(id) = msg.id {
308        buf.put_u32(id);
309    }
310
311    // Optional rate
312    if let Some(rate) = msg.rate {
313        buf.put_u32(rate);
314    }
315
316    Ok(())
317}
318
319/// HELLO (0x01)
320fn encode_hello(buf: &mut BytesMut, msg: &HelloMessage) -> Result<()> {
321    buf.put_u8(msg::HELLO);
322    buf.put_u8(msg.version);
323
324    // Feature flags
325    let mut features: u8 = 0;
326    for f in &msg.features {
327        match f.as_str() {
328            "param" => features |= 0x80,
329            "event" => features |= 0x40,
330            "stream" => features |= 0x20,
331            "gesture" => features |= 0x10,
332            "timeline" => features |= 0x08,
333            "federation" => features |= 0x04,
334            _ => {}
335        }
336    }
337    buf.put_u8(features);
338
339    // Name
340    encode_string(buf, &msg.name)?;
341
342    // Token (optional)
343    if let Some(ref token) = msg.token {
344        encode_string(buf, token)?;
345    } else {
346        buf.put_u16(0);
347    }
348
349    Ok(())
350}
351
352/// WELCOME (0x02)
353fn encode_welcome(buf: &mut BytesMut, msg: &WelcomeMessage) -> Result<()> {
354    buf.put_u8(msg::WELCOME);
355    buf.put_u8(msg.version);
356
357    // Feature flags (same as HELLO)
358    let mut features: u8 = 0;
359    for f in &msg.features {
360        match f.as_str() {
361            "param" => features |= 0x80,
362            "event" => features |= 0x40,
363            "stream" => features |= 0x20,
364            "gesture" => features |= 0x10,
365            "timeline" => features |= 0x08,
366            _ => {}
367        }
368    }
369    buf.put_u8(features);
370
371    // Server time
372    buf.put_u64(msg.time);
373
374    // Session ID
375    encode_string(buf, &msg.session)?;
376
377    // Server name
378    encode_string(buf, &msg.name)?;
379
380    // Token (optional)
381    if let Some(ref token) = msg.token {
382        encode_string(buf, token)?;
383    } else {
384        buf.put_u16(0);
385    }
386
387    Ok(())
388}
389
390/// ANNOUNCE (0x03)
391fn encode_announce(buf: &mut BytesMut, msg: &AnnounceMessage) -> Result<()> {
392    buf.put_u8(msg::ANNOUNCE);
393
394    encode_string(buf, &msg.namespace)?;
395    buf.put_u16(msg.signals.len() as u16);
396
397    for sig in &msg.signals {
398        encode_string(buf, &sig.address)?;
399        buf.put_u8(signal_type_code(sig.signal_type));
400
401        // Optional fields flags
402        let mut opt_flags: u8 = 0;
403        if sig.datatype.is_some() {
404            opt_flags |= 0x01;
405        }
406        if sig.access.is_some() {
407            opt_flags |= 0x02;
408        }
409        if sig.meta.is_some() {
410            opt_flags |= 0x04;
411        }
412        buf.put_u8(opt_flags);
413
414        if let Some(ref dt) = sig.datatype {
415            encode_string(buf, dt)?;
416        }
417        if let Some(ref access) = sig.access {
418            encode_string(buf, access)?;
419        }
420        if let Some(ref meta) = sig.meta {
421            // Encode meta as simple fields
422            let mut meta_flags: u8 = 0;
423            if meta.unit.is_some() {
424                meta_flags |= 0x01;
425            }
426            if meta.range.is_some() {
427                meta_flags |= 0x02;
428            }
429            if meta.default.is_some() {
430                meta_flags |= 0x04;
431            }
432            if meta.description.is_some() {
433                meta_flags |= 0x08;
434            }
435            buf.put_u8(meta_flags);
436
437            if let Some(ref unit) = meta.unit {
438                encode_string(buf, unit)?;
439            }
440            if let Some((min, max)) = meta.range {
441                buf.put_f64(min);
442                buf.put_f64(max);
443            }
444            if let Some(ref default) = meta.default {
445                buf.put_u8(value_type_code(default));
446                encode_value_data(buf, default)?;
447            }
448            if let Some(ref desc) = meta.description {
449                encode_string(buf, desc)?;
450            }
451        }
452    }
453
454    Ok(())
455}
456
457/// SUBSCRIBE (0x10)
458fn encode_subscribe(buf: &mut BytesMut, msg: &SubscribeMessage) -> Result<()> {
459    buf.put_u8(msg::SUBSCRIBE);
460    buf.put_u32(msg.id);
461
462    encode_string(buf, &msg.pattern)?;
463
464    // Signal type filter as bitmask
465    let mut type_mask: u8 = 0;
466    if msg.types.is_empty() {
467        type_mask = 0xFF; // All types
468    } else {
469        for t in &msg.types {
470            match t {
471                SignalType::Param => type_mask |= 0x01,
472                SignalType::Event => type_mask |= 0x02,
473                SignalType::Stream => type_mask |= 0x04,
474                SignalType::Gesture => type_mask |= 0x08,
475                SignalType::Timeline => type_mask |= 0x10,
476            }
477        }
478    }
479    buf.put_u8(type_mask);
480
481    // Options
482    if let Some(ref opts) = msg.options {
483        let mut opt_flags: u8 = 0;
484        if opts.max_rate.is_some() {
485            opt_flags |= 0x01;
486        }
487        if opts.epsilon.is_some() {
488            opt_flags |= 0x02;
489        }
490        if opts.history.is_some() {
491            opt_flags |= 0x04;
492        }
493        if opts.window.is_some() {
494            opt_flags |= 0x08;
495        }
496        buf.put_u8(opt_flags);
497
498        if let Some(rate) = opts.max_rate {
499            buf.put_u32(rate);
500        }
501        if let Some(eps) = opts.epsilon {
502            buf.put_f64(eps);
503        }
504        if let Some(hist) = opts.history {
505            buf.put_u32(hist);
506        }
507        if let Some(win) = opts.window {
508            buf.put_u32(win);
509        }
510    } else {
511        buf.put_u8(0); // No options
512    }
513
514    Ok(())
515}
516
517/// UNSUBSCRIBE (0x11)
518fn encode_unsubscribe(buf: &mut BytesMut, msg: &UnsubscribeMessage) -> Result<()> {
519    buf.put_u8(msg::UNSUBSCRIBE);
520    buf.put_u32(msg.id);
521    Ok(())
522}
523
524/// GET (0x22)
525fn encode_get(buf: &mut BytesMut, msg: &GetMessage) -> Result<()> {
526    buf.put_u8(msg::GET);
527    encode_string(buf, &msg.address)?;
528    Ok(())
529}
530
531/// SNAPSHOT (0x23)
532fn encode_snapshot(buf: &mut BytesMut, msg: &SnapshotMessage) -> Result<()> {
533    buf.put_u8(msg::SNAPSHOT);
534    buf.put_u16(msg.params.len() as u16);
535
536    for param in &msg.params {
537        encode_string(buf, &param.address)?;
538        buf.put_u8(value_type_code(&param.value));
539        encode_value_data(buf, &param.value)?;
540        buf.put_u64(param.revision);
541
542        let mut opt_flags: u8 = 0;
543        if param.writer.is_some() {
544            opt_flags |= 0x01;
545        }
546        if param.timestamp.is_some() {
547            opt_flags |= 0x02;
548        }
549        buf.put_u8(opt_flags);
550
551        if let Some(ref writer) = param.writer {
552            encode_string(buf, writer)?;
553        }
554        if let Some(ts) = param.timestamp {
555            buf.put_u64(ts);
556        }
557    }
558
559    Ok(())
560}
561
562/// REPLAY (0x24) - Journal replay request
563/// Flags: [has_from:1][has_to:1][has_limit:1][rsv:5]
564fn encode_replay(buf: &mut BytesMut, msg: &ReplayMessage) -> Result<()> {
565    buf.put_u8(msg::REPLAY);
566
567    let mut flags: u8 = 0;
568    if msg.from.is_some() {
569        flags |= 0x80;
570    }
571    if msg.to.is_some() {
572        flags |= 0x40;
573    }
574    if msg.limit.is_some() {
575        flags |= 0x20;
576    }
577    buf.put_u8(flags);
578
579    encode_string(buf, &msg.pattern)?;
580
581    if let Some(from) = msg.from {
582        buf.put_u64(from);
583    }
584    if let Some(to) = msg.to {
585        buf.put_u64(to);
586    }
587    if let Some(limit) = msg.limit {
588        buf.put_u32(limit);
589    }
590
591    // Signal type filter as bitmask (same format as SUBSCRIBE)
592    let mut type_mask: u8 = 0;
593    if msg.types.is_empty() {
594        type_mask = 0xFF; // All types
595    } else {
596        for t in &msg.types {
597            match t {
598                SignalType::Param => type_mask |= 0x01,
599                SignalType::Event => type_mask |= 0x02,
600                SignalType::Stream => type_mask |= 0x04,
601                SignalType::Gesture => type_mask |= 0x08,
602                SignalType::Timeline => type_mask |= 0x10,
603            }
604        }
605    }
606    buf.put_u8(type_mask);
607
608    Ok(())
609}
610
611/// FEDERATION_SYNC (0x04) - Router-to-router federation
612/// Layout: [op:u8][pattern_count:u16][patterns...][revision_count:u16][revisions...][flags:u8][optional fields]
613fn encode_federation_sync(buf: &mut BytesMut, msg: &FederationSyncMessage) -> Result<()> {
614    buf.put_u8(msg::FEDERATION_SYNC);
615    buf.put_u8(msg.op as u8);
616
617    // Encode patterns
618    buf.put_u16(msg.patterns.len() as u16);
619    for pattern in &msg.patterns {
620        encode_string(buf, pattern)?;
621    }
622
623    // Encode revisions map
624    buf.put_u16(msg.revisions.len() as u16);
625    for (key, val) in &msg.revisions {
626        encode_string(buf, key)?;
627        buf.put_u64(*val);
628    }
629
630    // Flags: [has_since:1][has_origin:1][rsv:6]
631    let mut flags: u8 = 0;
632    if msg.since_revision.is_some() {
633        flags |= 0x80;
634    }
635    if msg.origin.is_some() {
636        flags |= 0x40;
637    }
638    buf.put_u8(flags);
639
640    if let Some(since) = msg.since_revision {
641        buf.put_u64(since);
642    }
643    if let Some(ref origin) = msg.origin {
644        encode_string(buf, origin)?;
645    }
646
647    Ok(())
648}
649
650/// BUNDLE (0x30)
651fn encode_bundle(buf: &mut BytesMut, msg: &BundleMessage) -> Result<()> {
652    buf.put_u8(msg::BUNDLE);
653
654    let mut flags: u8 = 0;
655    if msg.timestamp.is_some() {
656        flags |= 0x80;
657    }
658    buf.put_u8(flags);
659
660    buf.put_u16(msg.messages.len() as u16);
661
662    if let Some(ts) = msg.timestamp {
663        buf.put_u64(ts);
664    }
665
666    // Each message prefixed with length
667    for inner_msg in &msg.messages {
668        let mut inner_buf = BytesMut::with_capacity(64);
669        encode_message_to_buf(&mut inner_buf, inner_msg)?;
670        buf.put_u16(inner_buf.len() as u16);
671        buf.extend_from_slice(&inner_buf);
672    }
673
674    Ok(())
675}
676
677/// SYNC (0x40)
678fn encode_sync(buf: &mut BytesMut, msg: &SyncMessage) -> Result<()> {
679    buf.put_u8(msg::SYNC);
680
681    let mut flags: u8 = 0;
682    if msg.t2.is_some() {
683        flags |= 0x01;
684    }
685    if msg.t3.is_some() {
686        flags |= 0x02;
687    }
688    buf.put_u8(flags);
689
690    buf.put_u64(msg.t1);
691    if let Some(t2) = msg.t2 {
692        buf.put_u64(t2);
693    }
694    if let Some(t3) = msg.t3 {
695        buf.put_u64(t3);
696    }
697
698    Ok(())
699}
700
701/// ACK (0x50)
702fn encode_ack(buf: &mut BytesMut, msg: &AckMessage) -> Result<()> {
703    buf.put_u8(msg::ACK);
704
705    let mut flags: u8 = 0;
706    if msg.address.is_some() {
707        flags |= 0x01;
708    }
709    if msg.revision.is_some() {
710        flags |= 0x02;
711    }
712    if msg.locked.is_some() {
713        flags |= 0x04;
714    }
715    if msg.holder.is_some() {
716        flags |= 0x08;
717    }
718    if msg.correlation_id.is_some() {
719        flags |= 0x10;
720    }
721    buf.put_u8(flags);
722
723    if let Some(ref addr) = msg.address {
724        encode_string(buf, addr)?;
725    }
726    if let Some(rev) = msg.revision {
727        buf.put_u64(rev);
728    }
729    if let Some(locked) = msg.locked {
730        buf.put_u8(if locked { 1 } else { 0 });
731    }
732    if let Some(ref holder) = msg.holder {
733        encode_string(buf, holder)?;
734    }
735    if let Some(corr) = msg.correlation_id {
736        buf.put_u32(corr);
737    }
738
739    Ok(())
740}
741
742/// ERROR (0x51)
743fn encode_error(buf: &mut BytesMut, msg: &ErrorMessage) -> Result<()> {
744    buf.put_u8(msg::ERROR);
745    buf.put_u16(msg.code);
746    encode_string(buf, &msg.message)?;
747
748    let mut flags: u8 = 0;
749    if msg.address.is_some() {
750        flags |= 0x01;
751    }
752    if msg.correlation_id.is_some() {
753        flags |= 0x02;
754    }
755    buf.put_u8(flags);
756
757    if let Some(ref addr) = msg.address {
758        encode_string(buf, addr)?;
759    }
760    if let Some(corr) = msg.correlation_id {
761        buf.put_u32(corr);
762    }
763
764    Ok(())
765}
766
767/// QUERY (0x60)
768fn encode_query(buf: &mut BytesMut, msg: &QueryMessage) -> Result<()> {
769    buf.put_u8(msg::QUERY);
770    encode_string(buf, &msg.pattern)?;
771    Ok(())
772}
773
774/// RESULT (0x61)
775fn encode_result(buf: &mut BytesMut, msg: &ResultMessage) -> Result<()> {
776    buf.put_u8(msg::RESULT);
777    buf.put_u16(msg.signals.len() as u16);
778
779    for sig in &msg.signals {
780        encode_string(buf, &sig.address)?;
781        buf.put_u8(signal_type_code(sig.signal_type));
782
783        let mut opt_flags: u8 = 0;
784        if sig.datatype.is_some() {
785            opt_flags |= 0x01;
786        }
787        if sig.access.is_some() {
788            opt_flags |= 0x02;
789        }
790        buf.put_u8(opt_flags);
791
792        if let Some(ref dt) = sig.datatype {
793            encode_string(buf, dt)?;
794        }
795        if let Some(ref access) = sig.access {
796            encode_string(buf, access)?;
797        }
798    }
799
800    Ok(())
801}
802
803// ============================================================================
804// VALUE ENCODING HELPERS
805// ============================================================================
806
807#[inline(always)]
808fn encode_string(buf: &mut BytesMut, s: &str) -> Result<()> {
809    let bytes = s.as_bytes();
810    if bytes.len() > u16::MAX as usize {
811        return Err(Error::PayloadTooLarge(bytes.len()));
812    }
813    buf.put_u16(bytes.len() as u16);
814    buf.extend_from_slice(bytes);
815    Ok(())
816}
817
818#[inline]
819fn encode_value_data(buf: &mut BytesMut, value: &Value) -> Result<()> {
820    match value {
821        Value::Null => {} // Type code is enough
822        Value::Bool(b) => buf.put_u8(if *b { 1 } else { 0 }),
823        Value::Int(i) => buf.put_i64(*i),
824        Value::Float(f) => buf.put_f64(*f),
825        Value::String(s) => encode_string(buf, s)?,
826        Value::Bytes(b) => {
827            buf.put_u16(b.len() as u16);
828            buf.extend_from_slice(b);
829        }
830        Value::Array(arr) => {
831            buf.put_u16(arr.len() as u16);
832            for item in arr {
833                buf.put_u8(value_type_code(item));
834                encode_value_data(buf, item)?;
835            }
836        }
837        Value::Map(map) => {
838            buf.put_u16(map.len() as u16);
839            for (key, val) in map {
840                encode_string(buf, key)?;
841                buf.put_u8(value_type_code(val));
842                encode_value_data(buf, val)?;
843            }
844        }
845    }
846    Ok(())
847}
848
849#[inline(always)]
850fn value_type_code(value: &Value) -> u8 {
851    match value {
852        Value::Null => val::NULL,
853        Value::Bool(_) => val::BOOL,
854        Value::Int(_) => val::I64,
855        Value::Float(_) => val::F64,
856        Value::String(_) => val::STRING,
857        Value::Bytes(_) => val::BYTES,
858        Value::Array(_) => val::ARRAY,
859        Value::Map(_) => val::MAP,
860    }
861}
862
863fn signal_type_code(sig: SignalType) -> u8 {
864    match sig {
865        SignalType::Param => sig::PARAM,
866        SignalType::Event => sig::EVENT,
867        SignalType::Stream => sig::STREAM,
868        SignalType::Gesture => sig::GESTURE,
869        SignalType::Timeline => sig::TIMELINE,
870    }
871}
872
873fn gesture_phase_code(phase: GesturePhase) -> u8 {
874    match phase {
875        GesturePhase::Start => phase::START,
876        GesturePhase::Move => phase::MOVE,
877        GesturePhase::End => phase::END,
878        GesturePhase::Cancel => phase::CANCEL,
879    }
880}
881
882// ============================================================================
883// BINARY DECODING
884// ============================================================================
885
886fn decode_v3_binary(bytes: &[u8]) -> Result<Message> {
887    if bytes.is_empty() {
888        return Err(Error::BufferTooSmall { needed: 1, have: 0 });
889    }
890
891    let mut buf = bytes;
892    let msg_type = buf.get_u8();
893
894    match msg_type {
895        msg::HELLO => decode_hello(&mut buf),
896        msg::WELCOME => decode_welcome(&mut buf),
897        msg::ANNOUNCE => decode_announce(&mut buf),
898        msg::SUBSCRIBE => decode_subscribe(&mut buf),
899        msg::UNSUBSCRIBE => decode_unsubscribe(&mut buf),
900        msg::PUBLISH => decode_publish(&mut buf),
901        msg::SET => decode_set(&mut buf),
902        msg::GET => decode_get(&mut buf),
903        msg::SNAPSHOT => decode_snapshot(&mut buf),
904        msg::REPLAY => decode_replay(&mut buf),
905        msg::FEDERATION_SYNC => decode_federation_sync(&mut buf),
906        msg::BUNDLE => decode_bundle(&mut buf),
907        msg::SYNC => decode_sync(&mut buf),
908        msg::PING => Ok(Message::Ping),
909        msg::PONG => Ok(Message::Pong),
910        msg::ACK => decode_ack(&mut buf),
911        msg::ERROR => decode_error(&mut buf),
912        msg::QUERY => decode_query(&mut buf),
913        msg::RESULT => decode_result(&mut buf),
914        _ => Err(Error::UnknownMessageType(msg_type)),
915    }
916}
917
918#[inline]
919fn decode_set(buf: &mut &[u8]) -> Result<Message> {
920    let flags = buf.get_u8();
921    let vtype = flags & 0x0F;
922    let has_rev = (flags & 0x80) != 0;
923    let lock = (flags & 0x40) != 0;
924    let unlock = (flags & 0x20) != 0;
925    let has_ttl = (flags & 0x10) != 0;
926
927    let address = decode_string(buf)?;
928    let value = decode_value_data(buf, vtype)?;
929
930    let revision = if has_rev { Some(buf.get_u64()) } else { None };
931
932    let ttl = if has_ttl {
933        let raw = buf.get_u32();
934        if raw == 0 {
935            Some(Ttl::Never)
936        } else if raw & 0x8000_0000 != 0 {
937            Some(Ttl::Absolute(raw & 0x7FFF_FFFF))
938        } else {
939            Some(Ttl::Sliding(raw))
940        }
941    } else {
942        None
943    };
944
945    Ok(Message::Set(SetMessage {
946        address,
947        value,
948        revision,
949        lock,
950        unlock,
951        ttl,
952    }))
953}
954
955fn decode_publish(buf: &mut &[u8]) -> Result<Message> {
956    let flags = buf.get_u8();
957    let sig_code = (flags >> 5) & 0x07;
958    let has_ts = (flags & 0x10) != 0;
959    let has_id = (flags & 0x08) != 0;
960    let phase_code = flags & 0x07;
961
962    let address = decode_string(buf)?;
963
964    // Value indicator
965    let value_indicator = buf.get_u8();
966    let (value, payload, samples) = match value_indicator {
967        0 => (None, None, None),
968        1 => {
969            let vtype = buf.get_u8();
970            let v = decode_value_data(buf, vtype)?;
971            (Some(v), None, None)
972        }
973        2 => {
974            let count = buf.get_u16() as usize;
975            let mut s = Vec::with_capacity(count);
976            for _ in 0..count {
977                s.push(buf.get_f64());
978            }
979            (None, None, Some(s))
980        }
981        _ => (None, None, None),
982    };
983
984    let timestamp = if has_ts { Some(buf.get_u64()) } else { None };
985    let id = if has_id { Some(buf.get_u32()) } else { None };
986
987    // Rate (if remaining bytes)
988    let rate = if buf.remaining() >= 4 {
989        Some(buf.get_u32())
990    } else {
991        None
992    };
993
994    let signal = Some(signal_type_from_code(sig_code));
995    let phase = Some(gesture_phase_from_code(phase_code));
996
997    Ok(Message::Publish(PublishMessage {
998        address,
999        signal,
1000        value,
1001        payload,
1002        samples,
1003        rate,
1004        id,
1005        phase,
1006        timestamp,
1007        timeline: None, // Timeline data is encoded separately when signal is Timeline
1008    }))
1009}
1010
1011fn decode_hello(buf: &mut &[u8]) -> Result<Message> {
1012    let version = buf.get_u8();
1013    let feature_flags = buf.get_u8();
1014
1015    let mut features = Vec::new();
1016    if feature_flags & 0x80 != 0 {
1017        features.push("param".to_string());
1018    }
1019    if feature_flags & 0x40 != 0 {
1020        features.push("event".to_string());
1021    }
1022    if feature_flags & 0x20 != 0 {
1023        features.push("stream".to_string());
1024    }
1025    if feature_flags & 0x10 != 0 {
1026        features.push("gesture".to_string());
1027    }
1028    if feature_flags & 0x08 != 0 {
1029        features.push("timeline".to_string());
1030    }
1031    if feature_flags & 0x04 != 0 {
1032        features.push("federation".to_string());
1033    }
1034
1035    let name = decode_string(buf)?;
1036    let token_str = decode_string(buf)?;
1037    let token = if token_str.is_empty() {
1038        None
1039    } else {
1040        Some(token_str)
1041    };
1042
1043    Ok(Message::Hello(HelloMessage {
1044        version,
1045        name,
1046        features,
1047        capabilities: None,
1048        token,
1049    }))
1050}
1051
1052fn decode_welcome(buf: &mut &[u8]) -> Result<Message> {
1053    let version = buf.get_u8();
1054    let feature_flags = buf.get_u8();
1055
1056    let mut features = Vec::new();
1057    if feature_flags & 0x80 != 0 {
1058        features.push("param".to_string());
1059    }
1060    if feature_flags & 0x40 != 0 {
1061        features.push("event".to_string());
1062    }
1063    if feature_flags & 0x20 != 0 {
1064        features.push("stream".to_string());
1065    }
1066    if feature_flags & 0x10 != 0 {
1067        features.push("gesture".to_string());
1068    }
1069    if feature_flags & 0x08 != 0 {
1070        features.push("timeline".to_string());
1071    }
1072
1073    let time = buf.get_u64();
1074    let session = decode_string(buf)?;
1075    let name = decode_string(buf)?;
1076
1077    let token_str = decode_string(buf)?;
1078    let token = if token_str.is_empty() {
1079        None
1080    } else {
1081        Some(token_str)
1082    };
1083
1084    Ok(Message::Welcome(WelcomeMessage {
1085        version,
1086        session,
1087        name,
1088        features,
1089        time,
1090        token,
1091    }))
1092}
1093
1094fn decode_announce(buf: &mut &[u8]) -> Result<Message> {
1095    let namespace = decode_string(buf)?;
1096    let count = buf.get_u16() as usize;
1097
1098    let mut signals = Vec::with_capacity(count);
1099    for _ in 0..count {
1100        let address = decode_string(buf)?;
1101        let sig_code = buf.get_u8();
1102        let opt_flags = buf.get_u8();
1103
1104        let datatype = if opt_flags & 0x01 != 0 {
1105            Some(decode_string(buf)?)
1106        } else {
1107            None
1108        };
1109        let access = if opt_flags & 0x02 != 0 {
1110            Some(decode_string(buf)?)
1111        } else {
1112            None
1113        };
1114
1115        let meta = if opt_flags & 0x04 != 0 {
1116            let meta_flags = buf.get_u8();
1117
1118            let unit = if meta_flags & 0x01 != 0 {
1119                Some(decode_string(buf)?)
1120            } else {
1121                None
1122            };
1123            let range = if meta_flags & 0x02 != 0 {
1124                let min = buf.get_f64();
1125                let max = buf.get_f64();
1126                Some((min, max))
1127            } else {
1128                None
1129            };
1130            let default = if meta_flags & 0x04 != 0 {
1131                let vtype = buf.get_u8();
1132                Some(decode_value_data(buf, vtype)?)
1133            } else {
1134                None
1135            };
1136            let description = if meta_flags & 0x08 != 0 {
1137                Some(decode_string(buf)?)
1138            } else {
1139                None
1140            };
1141
1142            Some(SignalMeta {
1143                unit,
1144                range,
1145                default,
1146                description,
1147            })
1148        } else {
1149            None
1150        };
1151
1152        signals.push(SignalDefinition {
1153            address,
1154            signal_type: signal_type_from_code(sig_code),
1155            datatype,
1156            access,
1157            meta,
1158        });
1159    }
1160
1161    Ok(Message::Announce(AnnounceMessage {
1162        namespace,
1163        signals,
1164        meta: None,
1165    }))
1166}
1167
1168fn decode_subscribe(buf: &mut &[u8]) -> Result<Message> {
1169    let id = buf.get_u32();
1170    let pattern = decode_string(buf)?;
1171    let type_mask = buf.get_u8();
1172
1173    let mut types = Vec::new();
1174    if type_mask == 0xFF {
1175        // All types, leave empty to indicate all
1176    } else {
1177        if type_mask & 0x01 != 0 {
1178            types.push(SignalType::Param);
1179        }
1180        if type_mask & 0x02 != 0 {
1181            types.push(SignalType::Event);
1182        }
1183        if type_mask & 0x04 != 0 {
1184            types.push(SignalType::Stream);
1185        }
1186        if type_mask & 0x08 != 0 {
1187            types.push(SignalType::Gesture);
1188        }
1189        if type_mask & 0x10 != 0 {
1190            types.push(SignalType::Timeline);
1191        }
1192    }
1193
1194    let opt_flags = buf.get_u8();
1195    let options = if opt_flags != 0 {
1196        let max_rate = if opt_flags & 0x01 != 0 {
1197            Some(buf.get_u32())
1198        } else {
1199            None
1200        };
1201        let epsilon = if opt_flags & 0x02 != 0 {
1202            Some(buf.get_f64())
1203        } else {
1204            None
1205        };
1206        let history = if opt_flags & 0x04 != 0 {
1207            Some(buf.get_u32())
1208        } else {
1209            None
1210        };
1211        let window = if opt_flags & 0x08 != 0 {
1212            Some(buf.get_u32())
1213        } else {
1214            None
1215        };
1216
1217        Some(SubscribeOptions {
1218            max_rate,
1219            epsilon,
1220            history,
1221            window,
1222        })
1223    } else {
1224        None
1225    };
1226
1227    Ok(Message::Subscribe(SubscribeMessage {
1228        id,
1229        pattern,
1230        types,
1231        options,
1232    }))
1233}
1234
1235fn decode_unsubscribe(buf: &mut &[u8]) -> Result<Message> {
1236    let id = buf.get_u32();
1237    Ok(Message::Unsubscribe(UnsubscribeMessage { id }))
1238}
1239
1240fn decode_get(buf: &mut &[u8]) -> Result<Message> {
1241    let address = decode_string(buf)?;
1242    Ok(Message::Get(GetMessage { address }))
1243}
1244
1245fn decode_snapshot(buf: &mut &[u8]) -> Result<Message> {
1246    let count = buf.get_u16() as usize;
1247    let mut params = Vec::with_capacity(count);
1248
1249    for _ in 0..count {
1250        let address = decode_string(buf)?;
1251        let vtype = buf.get_u8();
1252        let value = decode_value_data(buf, vtype)?;
1253        let revision = buf.get_u64();
1254        let opt_flags = buf.get_u8();
1255
1256        let writer = if opt_flags & 0x01 != 0 {
1257            Some(decode_string(buf)?)
1258        } else {
1259            None
1260        };
1261        let timestamp = if opt_flags & 0x02 != 0 {
1262            Some(buf.get_u64())
1263        } else {
1264            None
1265        };
1266
1267        params.push(ParamValue {
1268            address,
1269            value,
1270            revision,
1271            writer,
1272            timestamp,
1273        });
1274    }
1275
1276    Ok(Message::Snapshot(SnapshotMessage { params }))
1277}
1278
1279fn decode_replay(buf: &mut &[u8]) -> Result<Message> {
1280    let flags = buf.get_u8();
1281    let has_from = (flags & 0x80) != 0;
1282    let has_to = (flags & 0x40) != 0;
1283    let has_limit = (flags & 0x20) != 0;
1284
1285    let pattern = decode_string(buf)?;
1286
1287    let from = if has_from { Some(buf.get_u64()) } else { None };
1288    let to = if has_to { Some(buf.get_u64()) } else { None };
1289    let limit = if has_limit { Some(buf.get_u32()) } else { None };
1290
1291    let type_mask = buf.get_u8();
1292    let mut types = Vec::new();
1293    if type_mask != 0xFF {
1294        if type_mask & 0x01 != 0 {
1295            types.push(SignalType::Param);
1296        }
1297        if type_mask & 0x02 != 0 {
1298            types.push(SignalType::Event);
1299        }
1300        if type_mask & 0x04 != 0 {
1301            types.push(SignalType::Stream);
1302        }
1303        if type_mask & 0x08 != 0 {
1304            types.push(SignalType::Gesture);
1305        }
1306        if type_mask & 0x10 != 0 {
1307            types.push(SignalType::Timeline);
1308        }
1309    }
1310
1311    Ok(Message::Replay(ReplayMessage {
1312        pattern,
1313        from,
1314        to,
1315        limit,
1316        types,
1317    }))
1318}
1319
1320fn decode_federation_sync(buf: &mut &[u8]) -> Result<Message> {
1321    let op_byte = buf.get_u8();
1322    let op = match op_byte {
1323        0x01 => FederationOp::DeclareNamespaces,
1324        0x02 => FederationOp::RequestSync,
1325        0x03 => FederationOp::RevisionVector,
1326        0x04 => FederationOp::SyncComplete,
1327        _ => return Err(Error::DecodeError("unknown federation op".into())),
1328    };
1329
1330    // Decode patterns
1331    let pattern_count = buf.get_u16() as usize;
1332    let mut patterns = Vec::with_capacity(pattern_count);
1333    for _ in 0..pattern_count {
1334        patterns.push(decode_string(buf)?);
1335    }
1336
1337    // Decode revisions map
1338    let revision_count = buf.get_u16() as usize;
1339    let mut revisions = std::collections::HashMap::with_capacity(revision_count);
1340    for _ in 0..revision_count {
1341        let key = decode_string(buf)?;
1342        let val = buf.get_u64();
1343        revisions.insert(key, val);
1344    }
1345
1346    // Flags: [has_since:1][has_origin:1][rsv:6]
1347    let flags = buf.get_u8();
1348    let since_revision = if flags & 0x80 != 0 {
1349        Some(buf.get_u64())
1350    } else {
1351        None
1352    };
1353    let origin = if flags & 0x40 != 0 {
1354        Some(decode_string(buf)?)
1355    } else {
1356        None
1357    };
1358
1359    Ok(Message::FederationSync(FederationSyncMessage {
1360        op,
1361        patterns,
1362        revisions,
1363        since_revision,
1364        origin,
1365    }))
1366}
1367
1368fn decode_bundle(buf: &mut &[u8]) -> Result<Message> {
1369    let flags = buf.get_u8();
1370    let has_ts = (flags & 0x80) != 0;
1371    let count = buf.get_u16() as usize;
1372
1373    let timestamp = if has_ts { Some(buf.get_u64()) } else { None };
1374
1375    let mut messages = Vec::with_capacity(count);
1376    for _ in 0..count {
1377        let len = buf.get_u16() as usize;
1378        let inner_bytes = &buf[..len];
1379        buf.advance(len);
1380        messages.push(decode_v3_binary(inner_bytes)?);
1381    }
1382
1383    Ok(Message::Bundle(BundleMessage {
1384        timestamp,
1385        messages,
1386    }))
1387}
1388
1389fn decode_sync(buf: &mut &[u8]) -> Result<Message> {
1390    let flags = buf.get_u8();
1391    let t1 = buf.get_u64();
1392    let t2 = if flags & 0x01 != 0 {
1393        Some(buf.get_u64())
1394    } else {
1395        None
1396    };
1397    let t3 = if flags & 0x02 != 0 {
1398        Some(buf.get_u64())
1399    } else {
1400        None
1401    };
1402
1403    Ok(Message::Sync(SyncMessage { t1, t2, t3 }))
1404}
1405
1406fn decode_ack(buf: &mut &[u8]) -> Result<Message> {
1407    let flags = buf.get_u8();
1408
1409    let address = if flags & 0x01 != 0 {
1410        Some(decode_string(buf)?)
1411    } else {
1412        None
1413    };
1414    let revision = if flags & 0x02 != 0 {
1415        Some(buf.get_u64())
1416    } else {
1417        None
1418    };
1419    let locked = if flags & 0x04 != 0 {
1420        Some(buf.get_u8() != 0)
1421    } else {
1422        None
1423    };
1424    let holder = if flags & 0x08 != 0 {
1425        Some(decode_string(buf)?)
1426    } else {
1427        None
1428    };
1429    let correlation_id = if flags & 0x10 != 0 {
1430        Some(buf.get_u32())
1431    } else {
1432        None
1433    };
1434
1435    Ok(Message::Ack(AckMessage {
1436        address,
1437        revision,
1438        locked,
1439        holder,
1440        correlation_id,
1441    }))
1442}
1443
1444fn decode_error(buf: &mut &[u8]) -> Result<Message> {
1445    let code = buf.get_u16();
1446    let message = decode_string(buf)?;
1447    let flags = buf.get_u8();
1448
1449    let address = if flags & 0x01 != 0 {
1450        Some(decode_string(buf)?)
1451    } else {
1452        None
1453    };
1454    let correlation_id = if flags & 0x02 != 0 {
1455        Some(buf.get_u32())
1456    } else {
1457        None
1458    };
1459
1460    Ok(Message::Error(ErrorMessage {
1461        code,
1462        message,
1463        address,
1464        correlation_id,
1465    }))
1466}
1467
1468fn decode_query(buf: &mut &[u8]) -> Result<Message> {
1469    let pattern = decode_string(buf)?;
1470    Ok(Message::Query(QueryMessage { pattern }))
1471}
1472
1473fn decode_result(buf: &mut &[u8]) -> Result<Message> {
1474    let count = buf.get_u16() as usize;
1475    let mut signals = Vec::with_capacity(count);
1476
1477    for _ in 0..count {
1478        let address = decode_string(buf)?;
1479        let sig_code = buf.get_u8();
1480        let opt_flags = buf.get_u8();
1481
1482        let datatype = if opt_flags & 0x01 != 0 {
1483            Some(decode_string(buf)?)
1484        } else {
1485            None
1486        };
1487        let access = if opt_flags & 0x02 != 0 {
1488            Some(decode_string(buf)?)
1489        } else {
1490            None
1491        };
1492
1493        signals.push(SignalDefinition {
1494            address,
1495            signal_type: signal_type_from_code(sig_code),
1496            datatype,
1497            access,
1498            meta: None,
1499        });
1500    }
1501
1502    Ok(Message::Result(ResultMessage { signals }))
1503}
1504
1505// ============================================================================
1506// VALUE DECODING HELPERS
1507// ============================================================================
1508
1509#[inline(always)]
1510fn decode_string(buf: &mut &[u8]) -> Result<String> {
1511    if buf.remaining() < 2 {
1512        return Err(Error::BufferTooSmall {
1513            needed: 2,
1514            have: buf.remaining(),
1515        });
1516    }
1517    let len = buf.get_u16() as usize;
1518    if buf.remaining() < len {
1519        return Err(Error::BufferTooSmall {
1520            needed: len,
1521            have: buf.remaining(),
1522        });
1523    }
1524    let bytes = &buf[..len];
1525    buf.advance(len);
1526    String::from_utf8(bytes.to_vec()).map_err(|e| Error::DecodeError(e.to_string()))
1527}
1528
1529#[inline]
1530fn decode_value_data(buf: &mut &[u8], vtype: u8) -> Result<Value> {
1531    match vtype {
1532        val::NULL => Ok(Value::Null),
1533        val::BOOL => {
1534            let b = buf.get_u8();
1535            Ok(Value::Bool(b != 0))
1536        }
1537        val::I8 => {
1538            let i = buf.get_i8() as i64;
1539            Ok(Value::Int(i))
1540        }
1541        val::I16 => {
1542            let i = buf.get_i16() as i64;
1543            Ok(Value::Int(i))
1544        }
1545        val::I32 => {
1546            let i = buf.get_i32() as i64;
1547            Ok(Value::Int(i))
1548        }
1549        val::I64 => {
1550            let i = buf.get_i64();
1551            Ok(Value::Int(i))
1552        }
1553        val::F32 => {
1554            let f = buf.get_f32() as f64;
1555            Ok(Value::Float(f))
1556        }
1557        val::F64 => {
1558            let f = buf.get_f64();
1559            Ok(Value::Float(f))
1560        }
1561        val::STRING => {
1562            let s = decode_string(buf)?;
1563            Ok(Value::String(s))
1564        }
1565        val::BYTES => {
1566            if buf.remaining() < 2 {
1567                return Err(Error::BufferTooSmall {
1568                    needed: 2,
1569                    have: buf.remaining(),
1570                });
1571            }
1572            let len = buf.get_u16() as usize;
1573            if buf.remaining() < len {
1574                return Err(Error::BufferTooSmall {
1575                    needed: len,
1576                    have: buf.remaining(),
1577                });
1578            }
1579            let bytes = buf[..len].to_vec();
1580            buf.advance(len);
1581            Ok(Value::Bytes(bytes))
1582        }
1583        val::ARRAY => {
1584            let count = buf.get_u16() as usize;
1585            let mut arr = Vec::with_capacity(count);
1586            for _ in 0..count {
1587                let item_type = buf.get_u8();
1588                arr.push(decode_value_data(buf, item_type)?);
1589            }
1590            Ok(Value::Array(arr))
1591        }
1592        val::MAP => {
1593            let count = buf.get_u16() as usize;
1594            let mut map = HashMap::with_capacity(count);
1595            for _ in 0..count {
1596                let key = decode_string(buf)?;
1597                let val_type = buf.get_u8();
1598                let val = decode_value_data(buf, val_type)?;
1599                map.insert(key, val);
1600            }
1601            Ok(Value::Map(map))
1602        }
1603        _ => Err(Error::DecodeError(format!(
1604            "unknown value type: 0x{:02x}",
1605            vtype
1606        ))),
1607    }
1608}
1609
1610fn signal_type_from_code(code: u8) -> SignalType {
1611    match code {
1612        sig::PARAM => SignalType::Param,
1613        sig::EVENT => SignalType::Event,
1614        sig::STREAM => SignalType::Stream,
1615        sig::GESTURE => SignalType::Gesture,
1616        sig::TIMELINE => SignalType::Timeline,
1617        _ => SignalType::Event, // Default
1618    }
1619}
1620
1621fn gesture_phase_from_code(code: u8) -> GesturePhase {
1622    match code {
1623        phase::START => GesturePhase::Start,
1624        phase::MOVE => GesturePhase::Move,
1625        phase::END => GesturePhase::End,
1626        phase::CANCEL => GesturePhase::Cancel,
1627        _ => GesturePhase::Start, // Default
1628    }
1629}
1630
1631// ============================================================================
1632// V2 MESSAGEPACK DECODING (BACKWARD COMPATIBILITY)
1633// ============================================================================
1634
1635fn is_msgpack_map(byte: u8) -> bool {
1636    // fixmap: 0x80-0x8F, map16: 0xDE, map32: 0xDF
1637    (byte & 0xF0) == 0x80 || byte == 0xDE || byte == 0xDF
1638}
1639
1640fn decode_v2_msgpack(bytes: &[u8]) -> Result<Message> {
1641    rmp_serde::from_slice(bytes).map_err(|e| Error::DecodeError(e.to_string()))
1642}
1643
1644// ============================================================================
1645// TESTS
1646// ============================================================================
1647
1648#[cfg(test)]
1649mod tests {
1650    use super::*;
1651
1652    #[test]
1653    fn test_hello_roundtrip() {
1654        let msg = Message::Hello(HelloMessage {
1655            version: 1,
1656            name: "Test Client".to_string(),
1657            features: vec!["param".to_string(), "event".to_string()],
1658            capabilities: None,
1659            token: None,
1660        });
1661
1662        let encoded = encode(&msg).unwrap();
1663        let (decoded, frame) = decode(&encoded).unwrap();
1664
1665        match decoded {
1666            Message::Hello(hello) => {
1667                assert_eq!(hello.version, 1);
1668                assert_eq!(hello.name, "Test Client");
1669                assert!(hello.features.contains(&"param".to_string()));
1670                assert!(hello.features.contains(&"event".to_string()));
1671            }
1672            _ => panic!("Expected Hello message"),
1673        }
1674
1675        assert_eq!(frame.flags.qos, QoS::Fire);
1676        assert_eq!(frame.flags.version, 1); // binary encoding
1677    }
1678
1679    #[test]
1680    fn test_set_roundtrip() {
1681        let msg = Message::Set(SetMessage {
1682            address: "/test/value".to_string(),
1683            value: Value::Float(0.75),
1684            revision: Some(42),
1685            lock: false,
1686            unlock: false, ttl: None,
1687        });
1688
1689        let encoded = encode(&msg).unwrap();
1690        let (decoded, frame) = decode(&encoded).unwrap();
1691
1692        match decoded {
1693            Message::Set(set) => {
1694                assert_eq!(set.address, "/test/value");
1695                assert_eq!(set.value.as_f64(), Some(0.75));
1696                assert_eq!(set.revision, Some(42));
1697            }
1698            _ => panic!("Expected Set message"),
1699        }
1700
1701        assert_eq!(frame.flags.qos, QoS::Confirm);
1702    }
1703
1704    #[test]
1705    fn test_set_size_reduction() {
1706        let msg = Message::Set(SetMessage {
1707            address: "/test/value".to_string(),
1708            value: Value::Float(0.5),
1709            revision: Some(1),
1710            lock: false,
1711            unlock: false, ttl: None,
1712        });
1713
1714        // Binary encoding
1715        let binary_payload = encode_message(&msg).unwrap();
1716
1717        // MessagePack encoding (named keys, legacy)
1718        let msgpack_payload = rmp_serde::to_vec_named(&msg).unwrap();
1719
1720        println!("Binary payload: {} bytes", binary_payload.len());
1721        println!("MessagePack payload: {} bytes", msgpack_payload.len());
1722
1723        // Binary encoding should be significantly smaller (target: ~32 bytes vs ~69 bytes)
1724        assert!(
1725            binary_payload.len() < msgpack_payload.len(),
1726            "Binary encoding ({}) should be smaller than MessagePack ({})",
1727            binary_payload.len(),
1728            msgpack_payload.len()
1729        );
1730
1731        // Binary encoding should be at least 40% smaller
1732        let savings = 100 - (binary_payload.len() * 100 / msgpack_payload.len());
1733        println!("Size reduction: {}%", savings);
1734        assert!(
1735            savings >= 40,
1736            "Expected at least 40% size reduction, got {}%",
1737            savings
1738        );
1739    }
1740
1741    #[test]
1742    fn test_bundle_roundtrip() {
1743        let msg = Message::Bundle(BundleMessage {
1744            timestamp: Some(1000000),
1745            messages: vec![
1746                Message::Set(SetMessage {
1747                    address: "/light/1".to_string(),
1748                    value: Value::Float(1.0),
1749                    revision: None,
1750                    lock: false,
1751                    unlock: false, ttl: None,
1752                }),
1753                Message::Set(SetMessage {
1754                    address: "/light/2".to_string(),
1755                    value: Value::Float(0.0),
1756                    revision: None,
1757                    lock: false,
1758                    unlock: false, ttl: None,
1759                }),
1760            ],
1761        });
1762
1763        let encoded = encode(&msg).unwrap();
1764        let (decoded, _) = decode(&encoded).unwrap();
1765
1766        match decoded {
1767            Message::Bundle(bundle) => {
1768                assert_eq!(bundle.timestamp, Some(1000000));
1769                assert_eq!(bundle.messages.len(), 2);
1770            }
1771            _ => panic!("Expected Bundle message"),
1772        }
1773    }
1774
1775    #[test]
1776    fn test_value_types() {
1777        let values = vec![
1778            Value::Null,
1779            Value::Bool(true),
1780            Value::Int(42),
1781            Value::Float(1.25),
1782            Value::String("hello".to_string()),
1783            Value::Array(vec![Value::Int(1), Value::Int(2), Value::Int(3)]),
1784        ];
1785
1786        for value in values {
1787            let msg = Message::Set(SetMessage {
1788                address: "/test".to_string(),
1789                value: value.clone(),
1790                revision: None,
1791                lock: false,
1792                unlock: false, ttl: None,
1793            });
1794
1795            let encoded = encode(&msg).unwrap();
1796            let (decoded, _) = decode(&encoded).unwrap();
1797
1798            match decoded {
1799                Message::Set(set) => {
1800                    assert_eq!(set.value, value);
1801                }
1802                _ => panic!("Expected Set message"),
1803            }
1804        }
1805    }
1806
1807    #[test]
1808    fn test_backward_compat_v2_decode() {
1809        // Create a v2 MessagePack encoded message
1810        let msg = SetMessage {
1811            address: "/test/value".to_string(),
1812            value: Value::Float(0.5),
1813            revision: Some(1),
1814            lock: false,
1815            unlock: false, ttl: None,
1816        };
1817
1818        // Encode as v2 (MessagePack with named keys)
1819        let v2_bytes = rmp_serde::to_vec_named(&Message::Set(msg.clone())).unwrap();
1820
1821        // Should still decode correctly
1822        let decoded = decode_message(&v2_bytes).unwrap();
1823
1824        match decoded {
1825            Message::Set(set) => {
1826                assert_eq!(set.address, "/test/value");
1827                assert_eq!(set.value.as_f64(), Some(0.5));
1828            }
1829            _ => panic!("Expected Set message"),
1830        }
1831    }
1832
1833    #[test]
1834    fn test_ping_pong() {
1835        let ping = encode(&Message::Ping).unwrap();
1836        let (decoded, _) = decode(&ping).unwrap();
1837        assert!(matches!(decoded, Message::Ping));
1838
1839        let pong = encode(&Message::Pong).unwrap();
1840        let (decoded, _) = decode(&pong).unwrap();
1841        assert!(matches!(decoded, Message::Pong));
1842    }
1843
1844    #[test]
1845    fn test_publish_event() {
1846        let msg = Message::Publish(PublishMessage {
1847            address: "/cue/fire".to_string(),
1848            signal: Some(SignalType::Event),
1849            value: None,
1850            payload: Some(Value::String("intro".to_string())),
1851            samples: None,
1852            rate: None,
1853            id: None,
1854            phase: None,
1855            timestamp: Some(1234567890),
1856            timeline: None,
1857        });
1858
1859        let encoded = encode(&msg).unwrap();
1860        let (decoded, _) = decode(&encoded).unwrap();
1861
1862        match decoded {
1863            Message::Publish(pub_msg) => {
1864                assert_eq!(pub_msg.address, "/cue/fire");
1865                assert_eq!(pub_msg.signal, Some(SignalType::Event));
1866                assert_eq!(pub_msg.timestamp, Some(1234567890));
1867            }
1868            _ => panic!("Expected Publish message"),
1869        }
1870    }
1871
1872    #[test]
1873    fn test_subscribe_roundtrip() {
1874        let msg = Message::Subscribe(SubscribeMessage {
1875            id: 42,
1876            pattern: "/lumen/scene/*/layer/**".to_string(),
1877            types: vec![SignalType::Param, SignalType::Stream],
1878            options: Some(SubscribeOptions {
1879                max_rate: Some(60),
1880                epsilon: Some(0.01),
1881                history: None,
1882                window: None,
1883            }),
1884        });
1885
1886        let encoded = encode(&msg).unwrap();
1887        let (decoded, _) = decode(&encoded).unwrap();
1888
1889        match decoded {
1890            Message::Subscribe(sub) => {
1891                assert_eq!(sub.id, 42);
1892                assert_eq!(sub.pattern, "/lumen/scene/*/layer/**");
1893                assert!(sub.types.contains(&SignalType::Param));
1894                assert!(sub.types.contains(&SignalType::Stream));
1895                assert_eq!(sub.options.as_ref().unwrap().max_rate, Some(60));
1896            }
1897            _ => panic!("Expected Subscribe message"),
1898        }
1899    }
1900}