Skip to main content

clasp_core/
codec.rs

1//! CLASP Binary Codec
2//!
3//! Efficient binary encoding for all CLASP messages.
4//! Backward compatible: can decode v2 MessagePack frames.
5//!
6//! # Performance
7//!
8//! Compared to v2 (MessagePack with named keys):
9//! - SET message: 69 bytes → 32 bytes (54% smaller)
10//! - Encoding speed: ~10M msg/s (vs 1.8M)
11//! - Decoding speed: ~12M msg/s (vs 1.5M)
12
13use crate::types::*;
14use crate::{Error, Frame, QoS, Result};
15use bytes::{Buf, BufMut, Bytes, BytesMut};
16use std::collections::HashMap;
17
18/// Encoding version (1 = binary encoding, 0 = MessagePack legacy)
19pub const ENCODING_VERSION: u8 = 1;
20
21/// Message type codes
22pub mod msg {
23    pub const HELLO: u8 = 0x01;
24    pub const WELCOME: u8 = 0x02;
25    pub const ANNOUNCE: u8 = 0x03;
26    pub const SUBSCRIBE: u8 = 0x10;
27    pub const UNSUBSCRIBE: u8 = 0x11;
28    pub const PUBLISH: u8 = 0x20;
29    pub const SET: u8 = 0x21;
30    pub const GET: u8 = 0x22;
31    pub const SNAPSHOT: u8 = 0x23;
32    pub const FEDERATION_SYNC: u8 = 0x04;
33    pub const REPLAY: u8 = 0x24;
34    pub const BUNDLE: u8 = 0x30;
35    pub const SYNC: u8 = 0x40;
36    pub const PING: u8 = 0x41;
37    pub const PONG: u8 = 0x42;
38    pub const ACK: u8 = 0x50;
39    pub const ERROR: u8 = 0x51;
40    pub const QUERY: u8 = 0x60;
41    pub const RESULT: u8 = 0x61;
42}
43
44/// Value type codes for efficient binary encoding
45pub mod val {
46    pub const NULL: u8 = 0x00;
47    pub const BOOL: u8 = 0x01;
48    pub const I8: u8 = 0x02;
49    pub const I16: u8 = 0x03;
50    pub const I32: u8 = 0x04;
51    pub const I64: u8 = 0x05;
52    pub const F32: u8 = 0x06;
53    pub const F64: u8 = 0x07;
54    pub const STRING: u8 = 0x08;
55    pub const BYTES: u8 = 0x09;
56    pub const ARRAY: u8 = 0x0A;
57    pub const MAP: u8 = 0x0B;
58}
59
60/// Signal type codes
61pub mod sig {
62    pub const PARAM: u8 = 0;
63    pub const EVENT: u8 = 1;
64    pub const STREAM: u8 = 2;
65    pub const GESTURE: u8 = 3;
66    pub const TIMELINE: u8 = 4;
67}
68
69/// Gesture phase codes
70pub mod phase {
71    pub const START: u8 = 0;
72    pub const MOVE: u8 = 1;
73    pub const END: u8 = 2;
74    pub const CANCEL: u8 = 3;
75}
76
77// ============================================================================
78// PUBLIC API
79// ============================================================================
80
81/// Encode a message to binary format
82#[inline]
83pub fn encode_message(message: &Message) -> Result<Bytes> {
84    // Pre-allocate based on expected message size
85    let capacity = estimate_message_size(message);
86    let mut buf = BytesMut::with_capacity(capacity);
87    encode_message_to_buf(&mut buf, message)?;
88    Ok(buf.freeze())
89}
90
91/// Estimate message size for pre-allocation (avoids realloc)
92#[inline]
93fn estimate_message_size(msg: &Message) -> usize {
94    match msg {
95        Message::Set(m) => 2 + 2 + m.address.len() + 9 + if m.revision.is_some() { 8 } else { 0 },
96        Message::Publish(m) => 2 + 2 + m.address.len() + 16,
97        Message::Hello(m) => 4 + m.name.len() + 2,
98        Message::Welcome(m) => 12 + m.name.len() + m.session.len() + 4,
99        Message::Subscribe(m) => 6 + m.pattern.len() + 16,
100        Message::Bundle(m) => 12 + m.messages.len() * 48,
101        Message::Replay(m) => 4 + m.pattern.len() + 20,
102        Message::FederationSync(m) => {
103            4 + m.patterns.iter().map(|p| 2 + p.len()).sum::<usize>() + m.revisions.len() * 12 + 16
104        }
105        Message::Ping | Message::Pong => 5, // Just frame header
106        _ => 64,                            // Default for less common messages
107    }
108}
109
110/// Decode a message - auto-detects MessagePack (legacy) vs binary encoding
111#[inline]
112pub fn decode_message(bytes: &[u8]) -> Result<Message> {
113    if bytes.is_empty() {
114        return Err(Error::BufferTooSmall { needed: 1, have: 0 });
115    }
116
117    let first = bytes[0];
118
119    // Binary encoded messages start with known message type codes (0x01-0x61)
120    // v2 MessagePack maps start with 0x80-0x8F (fixmap) or 0xDE-0xDF (map16/map32)
121    if is_msgpack_map(first) {
122        // Legacy v2 format - use rmp-serde
123        decode_v2_msgpack(bytes)
124    } else {
125        // Binary encoding format
126        decode_v3_binary(bytes)
127    }
128}
129
130/// Encode a message into a complete frame (binary encoding)
131#[inline]
132pub fn encode(message: &Message) -> Result<Bytes> {
133    let payload = encode_message(message)?;
134    let mut frame = Frame::new(payload).with_qos(message.default_qos());
135    frame.flags.version = 1; // binary encoding (1 = binary, 0 = MessagePack legacy)
136    frame.encode()
137}
138
139/// Encode a message with options (binary encoding)
140pub fn encode_with_options(
141    message: &Message,
142    qos: Option<QoS>,
143    timestamp: Option<u64>,
144) -> Result<Bytes> {
145    let payload = encode_message(message)?;
146    let mut frame = Frame::new(payload);
147    frame.flags.version = 1; // binary encoding (1 = binary, 0 = MessagePack legacy)
148
149    if let Some(qos) = qos {
150        frame = frame.with_qos(qos);
151    } else {
152        frame = frame.with_qos(message.default_qos());
153    }
154
155    if let Some(ts) = timestamp {
156        frame = frame.with_timestamp(ts);
157    }
158
159    frame.encode()
160}
161
162/// Decode a frame and extract the message
163#[inline]
164pub fn decode(bytes: &[u8]) -> Result<(Message, Frame)> {
165    let frame = Frame::decode(bytes)?;
166    let message = decode_message(&frame.payload)?;
167    Ok((message, frame))
168}
169
170/// Helper to encode just the message payload (without frame) - binary encoding
171pub fn encode_payload(message: &Message) -> Result<Vec<u8>> {
172    let bytes = encode_message(message)?;
173    Ok(bytes.to_vec())
174}
175
176/// Helper to decode just a message payload (without frame)
177pub fn decode_payload(bytes: &[u8]) -> Result<Message> {
178    decode_message(bytes)
179}
180
181// ============================================================================
182// BINARY ENCODING
183// ============================================================================
184
185fn encode_message_to_buf(buf: &mut BytesMut, msg: &Message) -> Result<()> {
186    match msg {
187        Message::Hello(m) => encode_hello(buf, m),
188        Message::Welcome(m) => encode_welcome(buf, m),
189        Message::Announce(m) => encode_announce(buf, m),
190        Message::Subscribe(m) => encode_subscribe(buf, m),
191        Message::Unsubscribe(m) => encode_unsubscribe(buf, m),
192        Message::Publish(m) => encode_publish(buf, m),
193        Message::Set(m) => encode_set(buf, m),
194        Message::Get(m) => encode_get(buf, m),
195        Message::Snapshot(m) => encode_snapshot(buf, m),
196        Message::Replay(m) => encode_replay(buf, m),
197        Message::FederationSync(m) => encode_federation_sync(buf, m),
198        Message::Bundle(m) => encode_bundle(buf, m),
199        Message::Sync(m) => encode_sync(buf, m),
200        Message::Ping => {
201            buf.put_u8(msg::PING);
202            Ok(())
203        }
204        Message::Pong => {
205            buf.put_u8(msg::PONG);
206            Ok(())
207        }
208        Message::Ack(m) => encode_ack(buf, m),
209        Message::Error(m) => encode_error(buf, m),
210        Message::Query(m) => encode_query(buf, m),
211        Message::Result(m) => encode_result(buf, m),
212    }
213}
214
215/// SET (0x21) - Parameter Update
216/// Flags: [has_rev:1][lock:1][unlock:1][rsv:1][vtype:4]
217#[inline]
218fn encode_set(buf: &mut BytesMut, msg: &SetMessage) -> Result<()> {
219    buf.put_u8(msg::SET);
220
221    let vtype = value_type_code(&msg.value);
222    let mut flags = vtype & 0x0F;
223    if msg.revision.is_some() {
224        flags |= 0x80;
225    }
226    if msg.lock {
227        flags |= 0x40;
228    }
229    if msg.unlock {
230        flags |= 0x20;
231    }
232    buf.put_u8(flags);
233
234    // Address
235    encode_string(buf, &msg.address)?;
236
237    // Value (type already in flags for simple types)
238    encode_value_data(buf, &msg.value)?;
239
240    // Optional revision
241    if let Some(rev) = msg.revision {
242        buf.put_u64(rev);
243    }
244
245    Ok(())
246}
247
248/// PUBLISH (0x20) - Event/Stream/Gesture
249/// Flags: [sig_type:3][has_ts:1][has_id:1][phase:3]
250fn encode_publish(buf: &mut BytesMut, msg: &PublishMessage) -> Result<()> {
251    buf.put_u8(msg::PUBLISH);
252
253    let sig_code = msg.signal.map(signal_type_code).unwrap_or(sig::EVENT);
254    let phase_code = msg.phase.map(gesture_phase_code).unwrap_or(phase::START);
255
256    let mut flags: u8 = (sig_code & 0x07) << 5;
257    if msg.timestamp.is_some() {
258        flags |= 0x10;
259    }
260    if msg.id.is_some() {
261        flags |= 0x08;
262    }
263    flags |= phase_code & 0x07;
264    buf.put_u8(flags);
265
266    // Address
267    encode_string(buf, &msg.address)?;
268
269    // Value/payload
270    if let Some(ref value) = msg.value {
271        buf.put_u8(1); // has value
272        buf.put_u8(value_type_code(value));
273        encode_value_data(buf, value)?;
274    } else if let Some(ref payload) = msg.payload {
275        buf.put_u8(1); // has payload
276        buf.put_u8(value_type_code(payload));
277        encode_value_data(buf, payload)?;
278    } else if let Some(ref samples) = msg.samples {
279        buf.put_u8(2); // has samples
280        buf.put_u16(samples.len() as u16);
281        for sample in samples {
282            buf.put_f64(*sample);
283        }
284    } else {
285        buf.put_u8(0); // no value
286    }
287
288    // Optional timestamp
289    if let Some(ts) = msg.timestamp {
290        buf.put_u64(ts);
291    }
292
293    // Optional gesture ID
294    if let Some(id) = msg.id {
295        buf.put_u32(id);
296    }
297
298    // Optional rate
299    if let Some(rate) = msg.rate {
300        buf.put_u32(rate);
301    }
302
303    Ok(())
304}
305
306/// HELLO (0x01)
307fn encode_hello(buf: &mut BytesMut, msg: &HelloMessage) -> Result<()> {
308    buf.put_u8(msg::HELLO);
309    buf.put_u8(msg.version);
310
311    // Feature flags
312    let mut features: u8 = 0;
313    for f in &msg.features {
314        match f.as_str() {
315            "param" => features |= 0x80,
316            "event" => features |= 0x40,
317            "stream" => features |= 0x20,
318            "gesture" => features |= 0x10,
319            "timeline" => features |= 0x08,
320            "federation" => features |= 0x04,
321            _ => {}
322        }
323    }
324    buf.put_u8(features);
325
326    // Name
327    encode_string(buf, &msg.name)?;
328
329    // Token (optional)
330    if let Some(ref token) = msg.token {
331        encode_string(buf, token)?;
332    } else {
333        buf.put_u16(0);
334    }
335
336    Ok(())
337}
338
339/// WELCOME (0x02)
340fn encode_welcome(buf: &mut BytesMut, msg: &WelcomeMessage) -> Result<()> {
341    buf.put_u8(msg::WELCOME);
342    buf.put_u8(msg.version);
343
344    // Feature flags (same as HELLO)
345    let mut features: u8 = 0;
346    for f in &msg.features {
347        match f.as_str() {
348            "param" => features |= 0x80,
349            "event" => features |= 0x40,
350            "stream" => features |= 0x20,
351            "gesture" => features |= 0x10,
352            "timeline" => features |= 0x08,
353            _ => {}
354        }
355    }
356    buf.put_u8(features);
357
358    // Server time
359    buf.put_u64(msg.time);
360
361    // Session ID
362    encode_string(buf, &msg.session)?;
363
364    // Server name
365    encode_string(buf, &msg.name)?;
366
367    // Token (optional)
368    if let Some(ref token) = msg.token {
369        encode_string(buf, token)?;
370    } else {
371        buf.put_u16(0);
372    }
373
374    Ok(())
375}
376
377/// ANNOUNCE (0x03)
378fn encode_announce(buf: &mut BytesMut, msg: &AnnounceMessage) -> Result<()> {
379    buf.put_u8(msg::ANNOUNCE);
380
381    encode_string(buf, &msg.namespace)?;
382    buf.put_u16(msg.signals.len() as u16);
383
384    for sig in &msg.signals {
385        encode_string(buf, &sig.address)?;
386        buf.put_u8(signal_type_code(sig.signal_type));
387
388        // Optional fields flags
389        let mut opt_flags: u8 = 0;
390        if sig.datatype.is_some() {
391            opt_flags |= 0x01;
392        }
393        if sig.access.is_some() {
394            opt_flags |= 0x02;
395        }
396        if sig.meta.is_some() {
397            opt_flags |= 0x04;
398        }
399        buf.put_u8(opt_flags);
400
401        if let Some(ref dt) = sig.datatype {
402            encode_string(buf, dt)?;
403        }
404        if let Some(ref access) = sig.access {
405            encode_string(buf, access)?;
406        }
407        if let Some(ref meta) = sig.meta {
408            // Encode meta as simple fields
409            let mut meta_flags: u8 = 0;
410            if meta.unit.is_some() {
411                meta_flags |= 0x01;
412            }
413            if meta.range.is_some() {
414                meta_flags |= 0x02;
415            }
416            if meta.default.is_some() {
417                meta_flags |= 0x04;
418            }
419            if meta.description.is_some() {
420                meta_flags |= 0x08;
421            }
422            buf.put_u8(meta_flags);
423
424            if let Some(ref unit) = meta.unit {
425                encode_string(buf, unit)?;
426            }
427            if let Some((min, max)) = meta.range {
428                buf.put_f64(min);
429                buf.put_f64(max);
430            }
431            if let Some(ref default) = meta.default {
432                buf.put_u8(value_type_code(default));
433                encode_value_data(buf, default)?;
434            }
435            if let Some(ref desc) = meta.description {
436                encode_string(buf, desc)?;
437            }
438        }
439    }
440
441    Ok(())
442}
443
444/// SUBSCRIBE (0x10)
445fn encode_subscribe(buf: &mut BytesMut, msg: &SubscribeMessage) -> Result<()> {
446    buf.put_u8(msg::SUBSCRIBE);
447    buf.put_u32(msg.id);
448
449    encode_string(buf, &msg.pattern)?;
450
451    // Signal type filter as bitmask
452    let mut type_mask: u8 = 0;
453    if msg.types.is_empty() {
454        type_mask = 0xFF; // All types
455    } else {
456        for t in &msg.types {
457            match t {
458                SignalType::Param => type_mask |= 0x01,
459                SignalType::Event => type_mask |= 0x02,
460                SignalType::Stream => type_mask |= 0x04,
461                SignalType::Gesture => type_mask |= 0x08,
462                SignalType::Timeline => type_mask |= 0x10,
463            }
464        }
465    }
466    buf.put_u8(type_mask);
467
468    // Options
469    if let Some(ref opts) = msg.options {
470        let mut opt_flags: u8 = 0;
471        if opts.max_rate.is_some() {
472            opt_flags |= 0x01;
473        }
474        if opts.epsilon.is_some() {
475            opt_flags |= 0x02;
476        }
477        if opts.history.is_some() {
478            opt_flags |= 0x04;
479        }
480        if opts.window.is_some() {
481            opt_flags |= 0x08;
482        }
483        buf.put_u8(opt_flags);
484
485        if let Some(rate) = opts.max_rate {
486            buf.put_u32(rate);
487        }
488        if let Some(eps) = opts.epsilon {
489            buf.put_f64(eps);
490        }
491        if let Some(hist) = opts.history {
492            buf.put_u32(hist);
493        }
494        if let Some(win) = opts.window {
495            buf.put_u32(win);
496        }
497    } else {
498        buf.put_u8(0); // No options
499    }
500
501    Ok(())
502}
503
504/// UNSUBSCRIBE (0x11)
505fn encode_unsubscribe(buf: &mut BytesMut, msg: &UnsubscribeMessage) -> Result<()> {
506    buf.put_u8(msg::UNSUBSCRIBE);
507    buf.put_u32(msg.id);
508    Ok(())
509}
510
511/// GET (0x22)
512fn encode_get(buf: &mut BytesMut, msg: &GetMessage) -> Result<()> {
513    buf.put_u8(msg::GET);
514    encode_string(buf, &msg.address)?;
515    Ok(())
516}
517
518/// SNAPSHOT (0x23)
519fn encode_snapshot(buf: &mut BytesMut, msg: &SnapshotMessage) -> Result<()> {
520    buf.put_u8(msg::SNAPSHOT);
521    buf.put_u16(msg.params.len() as u16);
522
523    for param in &msg.params {
524        encode_string(buf, &param.address)?;
525        buf.put_u8(value_type_code(&param.value));
526        encode_value_data(buf, &param.value)?;
527        buf.put_u64(param.revision);
528
529        let mut opt_flags: u8 = 0;
530        if param.writer.is_some() {
531            opt_flags |= 0x01;
532        }
533        if param.timestamp.is_some() {
534            opt_flags |= 0x02;
535        }
536        buf.put_u8(opt_flags);
537
538        if let Some(ref writer) = param.writer {
539            encode_string(buf, writer)?;
540        }
541        if let Some(ts) = param.timestamp {
542            buf.put_u64(ts);
543        }
544    }
545
546    Ok(())
547}
548
549/// REPLAY (0x24) - Journal replay request
550/// Flags: [has_from:1][has_to:1][has_limit:1][rsv:5]
551fn encode_replay(buf: &mut BytesMut, msg: &ReplayMessage) -> Result<()> {
552    buf.put_u8(msg::REPLAY);
553
554    let mut flags: u8 = 0;
555    if msg.from.is_some() {
556        flags |= 0x80;
557    }
558    if msg.to.is_some() {
559        flags |= 0x40;
560    }
561    if msg.limit.is_some() {
562        flags |= 0x20;
563    }
564    buf.put_u8(flags);
565
566    encode_string(buf, &msg.pattern)?;
567
568    if let Some(from) = msg.from {
569        buf.put_u64(from);
570    }
571    if let Some(to) = msg.to {
572        buf.put_u64(to);
573    }
574    if let Some(limit) = msg.limit {
575        buf.put_u32(limit);
576    }
577
578    // Signal type filter as bitmask (same format as SUBSCRIBE)
579    let mut type_mask: u8 = 0;
580    if msg.types.is_empty() {
581        type_mask = 0xFF; // All types
582    } else {
583        for t in &msg.types {
584            match t {
585                SignalType::Param => type_mask |= 0x01,
586                SignalType::Event => type_mask |= 0x02,
587                SignalType::Stream => type_mask |= 0x04,
588                SignalType::Gesture => type_mask |= 0x08,
589                SignalType::Timeline => type_mask |= 0x10,
590            }
591        }
592    }
593    buf.put_u8(type_mask);
594
595    Ok(())
596}
597
598/// FEDERATION_SYNC (0x04) - Router-to-router federation
599/// Layout: [op:u8][pattern_count:u16][patterns...][revision_count:u16][revisions...][flags:u8][optional fields]
600fn encode_federation_sync(buf: &mut BytesMut, msg: &FederationSyncMessage) -> Result<()> {
601    buf.put_u8(msg::FEDERATION_SYNC);
602    buf.put_u8(msg.op as u8);
603
604    // Encode patterns
605    buf.put_u16(msg.patterns.len() as u16);
606    for pattern in &msg.patterns {
607        encode_string(buf, pattern)?;
608    }
609
610    // Encode revisions map
611    buf.put_u16(msg.revisions.len() as u16);
612    for (key, val) in &msg.revisions {
613        encode_string(buf, key)?;
614        buf.put_u64(*val);
615    }
616
617    // Flags: [has_since:1][has_origin:1][rsv:6]
618    let mut flags: u8 = 0;
619    if msg.since_revision.is_some() {
620        flags |= 0x80;
621    }
622    if msg.origin.is_some() {
623        flags |= 0x40;
624    }
625    buf.put_u8(flags);
626
627    if let Some(since) = msg.since_revision {
628        buf.put_u64(since);
629    }
630    if let Some(ref origin) = msg.origin {
631        encode_string(buf, origin)?;
632    }
633
634    Ok(())
635}
636
637/// BUNDLE (0x30)
638fn encode_bundle(buf: &mut BytesMut, msg: &BundleMessage) -> Result<()> {
639    buf.put_u8(msg::BUNDLE);
640
641    let mut flags: u8 = 0;
642    if msg.timestamp.is_some() {
643        flags |= 0x80;
644    }
645    buf.put_u8(flags);
646
647    buf.put_u16(msg.messages.len() as u16);
648
649    if let Some(ts) = msg.timestamp {
650        buf.put_u64(ts);
651    }
652
653    // Each message prefixed with length
654    for inner_msg in &msg.messages {
655        let mut inner_buf = BytesMut::with_capacity(64);
656        encode_message_to_buf(&mut inner_buf, inner_msg)?;
657        buf.put_u16(inner_buf.len() as u16);
658        buf.extend_from_slice(&inner_buf);
659    }
660
661    Ok(())
662}
663
664/// SYNC (0x40)
665fn encode_sync(buf: &mut BytesMut, msg: &SyncMessage) -> Result<()> {
666    buf.put_u8(msg::SYNC);
667
668    let mut flags: u8 = 0;
669    if msg.t2.is_some() {
670        flags |= 0x01;
671    }
672    if msg.t3.is_some() {
673        flags |= 0x02;
674    }
675    buf.put_u8(flags);
676
677    buf.put_u64(msg.t1);
678    if let Some(t2) = msg.t2 {
679        buf.put_u64(t2);
680    }
681    if let Some(t3) = msg.t3 {
682        buf.put_u64(t3);
683    }
684
685    Ok(())
686}
687
688/// ACK (0x50)
689fn encode_ack(buf: &mut BytesMut, msg: &AckMessage) -> Result<()> {
690    buf.put_u8(msg::ACK);
691
692    let mut flags: u8 = 0;
693    if msg.address.is_some() {
694        flags |= 0x01;
695    }
696    if msg.revision.is_some() {
697        flags |= 0x02;
698    }
699    if msg.locked.is_some() {
700        flags |= 0x04;
701    }
702    if msg.holder.is_some() {
703        flags |= 0x08;
704    }
705    if msg.correlation_id.is_some() {
706        flags |= 0x10;
707    }
708    buf.put_u8(flags);
709
710    if let Some(ref addr) = msg.address {
711        encode_string(buf, addr)?;
712    }
713    if let Some(rev) = msg.revision {
714        buf.put_u64(rev);
715    }
716    if let Some(locked) = msg.locked {
717        buf.put_u8(if locked { 1 } else { 0 });
718    }
719    if let Some(ref holder) = msg.holder {
720        encode_string(buf, holder)?;
721    }
722    if let Some(corr) = msg.correlation_id {
723        buf.put_u32(corr);
724    }
725
726    Ok(())
727}
728
729/// ERROR (0x51)
730fn encode_error(buf: &mut BytesMut, msg: &ErrorMessage) -> Result<()> {
731    buf.put_u8(msg::ERROR);
732    buf.put_u16(msg.code);
733    encode_string(buf, &msg.message)?;
734
735    let mut flags: u8 = 0;
736    if msg.address.is_some() {
737        flags |= 0x01;
738    }
739    if msg.correlation_id.is_some() {
740        flags |= 0x02;
741    }
742    buf.put_u8(flags);
743
744    if let Some(ref addr) = msg.address {
745        encode_string(buf, addr)?;
746    }
747    if let Some(corr) = msg.correlation_id {
748        buf.put_u32(corr);
749    }
750
751    Ok(())
752}
753
754/// QUERY (0x60)
755fn encode_query(buf: &mut BytesMut, msg: &QueryMessage) -> Result<()> {
756    buf.put_u8(msg::QUERY);
757    encode_string(buf, &msg.pattern)?;
758    Ok(())
759}
760
761/// RESULT (0x61)
762fn encode_result(buf: &mut BytesMut, msg: &ResultMessage) -> Result<()> {
763    buf.put_u8(msg::RESULT);
764    buf.put_u16(msg.signals.len() as u16);
765
766    for sig in &msg.signals {
767        encode_string(buf, &sig.address)?;
768        buf.put_u8(signal_type_code(sig.signal_type));
769
770        let mut opt_flags: u8 = 0;
771        if sig.datatype.is_some() {
772            opt_flags |= 0x01;
773        }
774        if sig.access.is_some() {
775            opt_flags |= 0x02;
776        }
777        buf.put_u8(opt_flags);
778
779        if let Some(ref dt) = sig.datatype {
780            encode_string(buf, dt)?;
781        }
782        if let Some(ref access) = sig.access {
783            encode_string(buf, access)?;
784        }
785    }
786
787    Ok(())
788}
789
790// ============================================================================
791// VALUE ENCODING HELPERS
792// ============================================================================
793
794#[inline(always)]
795fn encode_string(buf: &mut BytesMut, s: &str) -> Result<()> {
796    let bytes = s.as_bytes();
797    if bytes.len() > u16::MAX as usize {
798        return Err(Error::PayloadTooLarge(bytes.len()));
799    }
800    buf.put_u16(bytes.len() as u16);
801    buf.extend_from_slice(bytes);
802    Ok(())
803}
804
805#[inline]
806fn encode_value_data(buf: &mut BytesMut, value: &Value) -> Result<()> {
807    match value {
808        Value::Null => {} // Type code is enough
809        Value::Bool(b) => buf.put_u8(if *b { 1 } else { 0 }),
810        Value::Int(i) => buf.put_i64(*i),
811        Value::Float(f) => buf.put_f64(*f),
812        Value::String(s) => encode_string(buf, s)?,
813        Value::Bytes(b) => {
814            buf.put_u16(b.len() as u16);
815            buf.extend_from_slice(b);
816        }
817        Value::Array(arr) => {
818            buf.put_u16(arr.len() as u16);
819            for item in arr {
820                buf.put_u8(value_type_code(item));
821                encode_value_data(buf, item)?;
822            }
823        }
824        Value::Map(map) => {
825            buf.put_u16(map.len() as u16);
826            for (key, val) in map {
827                encode_string(buf, key)?;
828                buf.put_u8(value_type_code(val));
829                encode_value_data(buf, val)?;
830            }
831        }
832    }
833    Ok(())
834}
835
836#[inline(always)]
837fn value_type_code(value: &Value) -> u8 {
838    match value {
839        Value::Null => val::NULL,
840        Value::Bool(_) => val::BOOL,
841        Value::Int(_) => val::I64,
842        Value::Float(_) => val::F64,
843        Value::String(_) => val::STRING,
844        Value::Bytes(_) => val::BYTES,
845        Value::Array(_) => val::ARRAY,
846        Value::Map(_) => val::MAP,
847    }
848}
849
850fn signal_type_code(sig: SignalType) -> u8 {
851    match sig {
852        SignalType::Param => sig::PARAM,
853        SignalType::Event => sig::EVENT,
854        SignalType::Stream => sig::STREAM,
855        SignalType::Gesture => sig::GESTURE,
856        SignalType::Timeline => sig::TIMELINE,
857    }
858}
859
860fn gesture_phase_code(phase: GesturePhase) -> u8 {
861    match phase {
862        GesturePhase::Start => phase::START,
863        GesturePhase::Move => phase::MOVE,
864        GesturePhase::End => phase::END,
865        GesturePhase::Cancel => phase::CANCEL,
866    }
867}
868
869// ============================================================================
870// BINARY DECODING
871// ============================================================================
872
873fn decode_v3_binary(bytes: &[u8]) -> Result<Message> {
874    if bytes.is_empty() {
875        return Err(Error::BufferTooSmall { needed: 1, have: 0 });
876    }
877
878    let mut buf = bytes;
879    let msg_type = buf.get_u8();
880
881    match msg_type {
882        msg::HELLO => decode_hello(&mut buf),
883        msg::WELCOME => decode_welcome(&mut buf),
884        msg::ANNOUNCE => decode_announce(&mut buf),
885        msg::SUBSCRIBE => decode_subscribe(&mut buf),
886        msg::UNSUBSCRIBE => decode_unsubscribe(&mut buf),
887        msg::PUBLISH => decode_publish(&mut buf),
888        msg::SET => decode_set(&mut buf),
889        msg::GET => decode_get(&mut buf),
890        msg::SNAPSHOT => decode_snapshot(&mut buf),
891        msg::REPLAY => decode_replay(&mut buf),
892        msg::FEDERATION_SYNC => decode_federation_sync(&mut buf),
893        msg::BUNDLE => decode_bundle(&mut buf),
894        msg::SYNC => decode_sync(&mut buf),
895        msg::PING => Ok(Message::Ping),
896        msg::PONG => Ok(Message::Pong),
897        msg::ACK => decode_ack(&mut buf),
898        msg::ERROR => decode_error(&mut buf),
899        msg::QUERY => decode_query(&mut buf),
900        msg::RESULT => decode_result(&mut buf),
901        _ => Err(Error::UnknownMessageType(msg_type)),
902    }
903}
904
905#[inline]
906fn decode_set(buf: &mut &[u8]) -> Result<Message> {
907    let flags = buf.get_u8();
908    let vtype = flags & 0x0F;
909    let has_rev = (flags & 0x80) != 0;
910    let lock = (flags & 0x40) != 0;
911    let unlock = (flags & 0x20) != 0;
912
913    let address = decode_string(buf)?;
914    let value = decode_value_data(buf, vtype)?;
915
916    let revision = if has_rev { Some(buf.get_u64()) } else { None };
917
918    Ok(Message::Set(SetMessage {
919        address,
920        value,
921        revision,
922        lock,
923        unlock,
924    }))
925}
926
927fn decode_publish(buf: &mut &[u8]) -> Result<Message> {
928    let flags = buf.get_u8();
929    let sig_code = (flags >> 5) & 0x07;
930    let has_ts = (flags & 0x10) != 0;
931    let has_id = (flags & 0x08) != 0;
932    let phase_code = flags & 0x07;
933
934    let address = decode_string(buf)?;
935
936    // Value indicator
937    let value_indicator = buf.get_u8();
938    let (value, payload, samples) = match value_indicator {
939        0 => (None, None, None),
940        1 => {
941            let vtype = buf.get_u8();
942            let v = decode_value_data(buf, vtype)?;
943            (Some(v), None, None)
944        }
945        2 => {
946            let count = buf.get_u16() as usize;
947            let mut s = Vec::with_capacity(count);
948            for _ in 0..count {
949                s.push(buf.get_f64());
950            }
951            (None, None, Some(s))
952        }
953        _ => (None, None, None),
954    };
955
956    let timestamp = if has_ts { Some(buf.get_u64()) } else { None };
957    let id = if has_id { Some(buf.get_u32()) } else { None };
958
959    // Rate (if remaining bytes)
960    let rate = if buf.remaining() >= 4 {
961        Some(buf.get_u32())
962    } else {
963        None
964    };
965
966    let signal = Some(signal_type_from_code(sig_code));
967    let phase = Some(gesture_phase_from_code(phase_code));
968
969    Ok(Message::Publish(PublishMessage {
970        address,
971        signal,
972        value,
973        payload,
974        samples,
975        rate,
976        id,
977        phase,
978        timestamp,
979        timeline: None, // Timeline data is encoded separately when signal is Timeline
980    }))
981}
982
983fn decode_hello(buf: &mut &[u8]) -> Result<Message> {
984    let version = buf.get_u8();
985    let feature_flags = buf.get_u8();
986
987    let mut features = Vec::new();
988    if feature_flags & 0x80 != 0 {
989        features.push("param".to_string());
990    }
991    if feature_flags & 0x40 != 0 {
992        features.push("event".to_string());
993    }
994    if feature_flags & 0x20 != 0 {
995        features.push("stream".to_string());
996    }
997    if feature_flags & 0x10 != 0 {
998        features.push("gesture".to_string());
999    }
1000    if feature_flags & 0x08 != 0 {
1001        features.push("timeline".to_string());
1002    }
1003    if feature_flags & 0x04 != 0 {
1004        features.push("federation".to_string());
1005    }
1006
1007    let name = decode_string(buf)?;
1008    let token_str = decode_string(buf)?;
1009    let token = if token_str.is_empty() {
1010        None
1011    } else {
1012        Some(token_str)
1013    };
1014
1015    Ok(Message::Hello(HelloMessage {
1016        version,
1017        name,
1018        features,
1019        capabilities: None,
1020        token,
1021    }))
1022}
1023
1024fn decode_welcome(buf: &mut &[u8]) -> Result<Message> {
1025    let version = buf.get_u8();
1026    let feature_flags = buf.get_u8();
1027
1028    let mut features = Vec::new();
1029    if feature_flags & 0x80 != 0 {
1030        features.push("param".to_string());
1031    }
1032    if feature_flags & 0x40 != 0 {
1033        features.push("event".to_string());
1034    }
1035    if feature_flags & 0x20 != 0 {
1036        features.push("stream".to_string());
1037    }
1038    if feature_flags & 0x10 != 0 {
1039        features.push("gesture".to_string());
1040    }
1041    if feature_flags & 0x08 != 0 {
1042        features.push("timeline".to_string());
1043    }
1044
1045    let time = buf.get_u64();
1046    let session = decode_string(buf)?;
1047    let name = decode_string(buf)?;
1048
1049    let token_str = decode_string(buf)?;
1050    let token = if token_str.is_empty() {
1051        None
1052    } else {
1053        Some(token_str)
1054    };
1055
1056    Ok(Message::Welcome(WelcomeMessage {
1057        version,
1058        session,
1059        name,
1060        features,
1061        time,
1062        token,
1063    }))
1064}
1065
1066fn decode_announce(buf: &mut &[u8]) -> Result<Message> {
1067    let namespace = decode_string(buf)?;
1068    let count = buf.get_u16() as usize;
1069
1070    let mut signals = Vec::with_capacity(count);
1071    for _ in 0..count {
1072        let address = decode_string(buf)?;
1073        let sig_code = buf.get_u8();
1074        let opt_flags = buf.get_u8();
1075
1076        let datatype = if opt_flags & 0x01 != 0 {
1077            Some(decode_string(buf)?)
1078        } else {
1079            None
1080        };
1081        let access = if opt_flags & 0x02 != 0 {
1082            Some(decode_string(buf)?)
1083        } else {
1084            None
1085        };
1086
1087        let meta = if opt_flags & 0x04 != 0 {
1088            let meta_flags = buf.get_u8();
1089
1090            let unit = if meta_flags & 0x01 != 0 {
1091                Some(decode_string(buf)?)
1092            } else {
1093                None
1094            };
1095            let range = if meta_flags & 0x02 != 0 {
1096                let min = buf.get_f64();
1097                let max = buf.get_f64();
1098                Some((min, max))
1099            } else {
1100                None
1101            };
1102            let default = if meta_flags & 0x04 != 0 {
1103                let vtype = buf.get_u8();
1104                Some(decode_value_data(buf, vtype)?)
1105            } else {
1106                None
1107            };
1108            let description = if meta_flags & 0x08 != 0 {
1109                Some(decode_string(buf)?)
1110            } else {
1111                None
1112            };
1113
1114            Some(SignalMeta {
1115                unit,
1116                range,
1117                default,
1118                description,
1119            })
1120        } else {
1121            None
1122        };
1123
1124        signals.push(SignalDefinition {
1125            address,
1126            signal_type: signal_type_from_code(sig_code),
1127            datatype,
1128            access,
1129            meta,
1130        });
1131    }
1132
1133    Ok(Message::Announce(AnnounceMessage {
1134        namespace,
1135        signals,
1136        meta: None,
1137    }))
1138}
1139
1140fn decode_subscribe(buf: &mut &[u8]) -> Result<Message> {
1141    let id = buf.get_u32();
1142    let pattern = decode_string(buf)?;
1143    let type_mask = buf.get_u8();
1144
1145    let mut types = Vec::new();
1146    if type_mask == 0xFF {
1147        // All types, leave empty to indicate all
1148    } else {
1149        if type_mask & 0x01 != 0 {
1150            types.push(SignalType::Param);
1151        }
1152        if type_mask & 0x02 != 0 {
1153            types.push(SignalType::Event);
1154        }
1155        if type_mask & 0x04 != 0 {
1156            types.push(SignalType::Stream);
1157        }
1158        if type_mask & 0x08 != 0 {
1159            types.push(SignalType::Gesture);
1160        }
1161        if type_mask & 0x10 != 0 {
1162            types.push(SignalType::Timeline);
1163        }
1164    }
1165
1166    let opt_flags = buf.get_u8();
1167    let options = if opt_flags != 0 {
1168        let max_rate = if opt_flags & 0x01 != 0 {
1169            Some(buf.get_u32())
1170        } else {
1171            None
1172        };
1173        let epsilon = if opt_flags & 0x02 != 0 {
1174            Some(buf.get_f64())
1175        } else {
1176            None
1177        };
1178        let history = if opt_flags & 0x04 != 0 {
1179            Some(buf.get_u32())
1180        } else {
1181            None
1182        };
1183        let window = if opt_flags & 0x08 != 0 {
1184            Some(buf.get_u32())
1185        } else {
1186            None
1187        };
1188
1189        Some(SubscribeOptions {
1190            max_rate,
1191            epsilon,
1192            history,
1193            window,
1194        })
1195    } else {
1196        None
1197    };
1198
1199    Ok(Message::Subscribe(SubscribeMessage {
1200        id,
1201        pattern,
1202        types,
1203        options,
1204    }))
1205}
1206
1207fn decode_unsubscribe(buf: &mut &[u8]) -> Result<Message> {
1208    let id = buf.get_u32();
1209    Ok(Message::Unsubscribe(UnsubscribeMessage { id }))
1210}
1211
1212fn decode_get(buf: &mut &[u8]) -> Result<Message> {
1213    let address = decode_string(buf)?;
1214    Ok(Message::Get(GetMessage { address }))
1215}
1216
1217fn decode_snapshot(buf: &mut &[u8]) -> Result<Message> {
1218    let count = buf.get_u16() as usize;
1219    let mut params = Vec::with_capacity(count);
1220
1221    for _ in 0..count {
1222        let address = decode_string(buf)?;
1223        let vtype = buf.get_u8();
1224        let value = decode_value_data(buf, vtype)?;
1225        let revision = buf.get_u64();
1226        let opt_flags = buf.get_u8();
1227
1228        let writer = if opt_flags & 0x01 != 0 {
1229            Some(decode_string(buf)?)
1230        } else {
1231            None
1232        };
1233        let timestamp = if opt_flags & 0x02 != 0 {
1234            Some(buf.get_u64())
1235        } else {
1236            None
1237        };
1238
1239        params.push(ParamValue {
1240            address,
1241            value,
1242            revision,
1243            writer,
1244            timestamp,
1245        });
1246    }
1247
1248    Ok(Message::Snapshot(SnapshotMessage { params }))
1249}
1250
1251fn decode_replay(buf: &mut &[u8]) -> Result<Message> {
1252    let flags = buf.get_u8();
1253    let has_from = (flags & 0x80) != 0;
1254    let has_to = (flags & 0x40) != 0;
1255    let has_limit = (flags & 0x20) != 0;
1256
1257    let pattern = decode_string(buf)?;
1258
1259    let from = if has_from { Some(buf.get_u64()) } else { None };
1260    let to = if has_to { Some(buf.get_u64()) } else { None };
1261    let limit = if has_limit { Some(buf.get_u32()) } else { None };
1262
1263    let type_mask = buf.get_u8();
1264    let mut types = Vec::new();
1265    if type_mask != 0xFF {
1266        if type_mask & 0x01 != 0 {
1267            types.push(SignalType::Param);
1268        }
1269        if type_mask & 0x02 != 0 {
1270            types.push(SignalType::Event);
1271        }
1272        if type_mask & 0x04 != 0 {
1273            types.push(SignalType::Stream);
1274        }
1275        if type_mask & 0x08 != 0 {
1276            types.push(SignalType::Gesture);
1277        }
1278        if type_mask & 0x10 != 0 {
1279            types.push(SignalType::Timeline);
1280        }
1281    }
1282
1283    Ok(Message::Replay(ReplayMessage {
1284        pattern,
1285        from,
1286        to,
1287        limit,
1288        types,
1289    }))
1290}
1291
1292fn decode_federation_sync(buf: &mut &[u8]) -> Result<Message> {
1293    let op_byte = buf.get_u8();
1294    let op = match op_byte {
1295        0x01 => FederationOp::DeclareNamespaces,
1296        0x02 => FederationOp::RequestSync,
1297        0x03 => FederationOp::RevisionVector,
1298        0x04 => FederationOp::SyncComplete,
1299        _ => return Err(Error::DecodeError("unknown federation op".into())),
1300    };
1301
1302    // Decode patterns
1303    let pattern_count = buf.get_u16() as usize;
1304    let mut patterns = Vec::with_capacity(pattern_count);
1305    for _ in 0..pattern_count {
1306        patterns.push(decode_string(buf)?);
1307    }
1308
1309    // Decode revisions map
1310    let revision_count = buf.get_u16() as usize;
1311    let mut revisions = std::collections::HashMap::with_capacity(revision_count);
1312    for _ in 0..revision_count {
1313        let key = decode_string(buf)?;
1314        let val = buf.get_u64();
1315        revisions.insert(key, val);
1316    }
1317
1318    // Flags: [has_since:1][has_origin:1][rsv:6]
1319    let flags = buf.get_u8();
1320    let since_revision = if flags & 0x80 != 0 {
1321        Some(buf.get_u64())
1322    } else {
1323        None
1324    };
1325    let origin = if flags & 0x40 != 0 {
1326        Some(decode_string(buf)?)
1327    } else {
1328        None
1329    };
1330
1331    Ok(Message::FederationSync(FederationSyncMessage {
1332        op,
1333        patterns,
1334        revisions,
1335        since_revision,
1336        origin,
1337    }))
1338}
1339
1340fn decode_bundle(buf: &mut &[u8]) -> Result<Message> {
1341    let flags = buf.get_u8();
1342    let has_ts = (flags & 0x80) != 0;
1343    let count = buf.get_u16() as usize;
1344
1345    let timestamp = if has_ts { Some(buf.get_u64()) } else { None };
1346
1347    let mut messages = Vec::with_capacity(count);
1348    for _ in 0..count {
1349        let len = buf.get_u16() as usize;
1350        let inner_bytes = &buf[..len];
1351        buf.advance(len);
1352        messages.push(decode_v3_binary(inner_bytes)?);
1353    }
1354
1355    Ok(Message::Bundle(BundleMessage {
1356        timestamp,
1357        messages,
1358    }))
1359}
1360
1361fn decode_sync(buf: &mut &[u8]) -> Result<Message> {
1362    let flags = buf.get_u8();
1363    let t1 = buf.get_u64();
1364    let t2 = if flags & 0x01 != 0 {
1365        Some(buf.get_u64())
1366    } else {
1367        None
1368    };
1369    let t3 = if flags & 0x02 != 0 {
1370        Some(buf.get_u64())
1371    } else {
1372        None
1373    };
1374
1375    Ok(Message::Sync(SyncMessage { t1, t2, t3 }))
1376}
1377
1378fn decode_ack(buf: &mut &[u8]) -> Result<Message> {
1379    let flags = buf.get_u8();
1380
1381    let address = if flags & 0x01 != 0 {
1382        Some(decode_string(buf)?)
1383    } else {
1384        None
1385    };
1386    let revision = if flags & 0x02 != 0 {
1387        Some(buf.get_u64())
1388    } else {
1389        None
1390    };
1391    let locked = if flags & 0x04 != 0 {
1392        Some(buf.get_u8() != 0)
1393    } else {
1394        None
1395    };
1396    let holder = if flags & 0x08 != 0 {
1397        Some(decode_string(buf)?)
1398    } else {
1399        None
1400    };
1401    let correlation_id = if flags & 0x10 != 0 {
1402        Some(buf.get_u32())
1403    } else {
1404        None
1405    };
1406
1407    Ok(Message::Ack(AckMessage {
1408        address,
1409        revision,
1410        locked,
1411        holder,
1412        correlation_id,
1413    }))
1414}
1415
1416fn decode_error(buf: &mut &[u8]) -> Result<Message> {
1417    let code = buf.get_u16();
1418    let message = decode_string(buf)?;
1419    let flags = buf.get_u8();
1420
1421    let address = if flags & 0x01 != 0 {
1422        Some(decode_string(buf)?)
1423    } else {
1424        None
1425    };
1426    let correlation_id = if flags & 0x02 != 0 {
1427        Some(buf.get_u32())
1428    } else {
1429        None
1430    };
1431
1432    Ok(Message::Error(ErrorMessage {
1433        code,
1434        message,
1435        address,
1436        correlation_id,
1437    }))
1438}
1439
1440fn decode_query(buf: &mut &[u8]) -> Result<Message> {
1441    let pattern = decode_string(buf)?;
1442    Ok(Message::Query(QueryMessage { pattern }))
1443}
1444
1445fn decode_result(buf: &mut &[u8]) -> Result<Message> {
1446    let count = buf.get_u16() as usize;
1447    let mut signals = Vec::with_capacity(count);
1448
1449    for _ in 0..count {
1450        let address = decode_string(buf)?;
1451        let sig_code = buf.get_u8();
1452        let opt_flags = buf.get_u8();
1453
1454        let datatype = if opt_flags & 0x01 != 0 {
1455            Some(decode_string(buf)?)
1456        } else {
1457            None
1458        };
1459        let access = if opt_flags & 0x02 != 0 {
1460            Some(decode_string(buf)?)
1461        } else {
1462            None
1463        };
1464
1465        signals.push(SignalDefinition {
1466            address,
1467            signal_type: signal_type_from_code(sig_code),
1468            datatype,
1469            access,
1470            meta: None,
1471        });
1472    }
1473
1474    Ok(Message::Result(ResultMessage { signals }))
1475}
1476
1477// ============================================================================
1478// VALUE DECODING HELPERS
1479// ============================================================================
1480
1481#[inline(always)]
1482fn decode_string(buf: &mut &[u8]) -> Result<String> {
1483    if buf.remaining() < 2 {
1484        return Err(Error::BufferTooSmall {
1485            needed: 2,
1486            have: buf.remaining(),
1487        });
1488    }
1489    let len = buf.get_u16() as usize;
1490    if buf.remaining() < len {
1491        return Err(Error::BufferTooSmall {
1492            needed: len,
1493            have: buf.remaining(),
1494        });
1495    }
1496    let bytes = &buf[..len];
1497    buf.advance(len);
1498    String::from_utf8(bytes.to_vec()).map_err(|e| Error::DecodeError(e.to_string()))
1499}
1500
1501#[inline]
1502fn decode_value_data(buf: &mut &[u8], vtype: u8) -> Result<Value> {
1503    match vtype {
1504        val::NULL => Ok(Value::Null),
1505        val::BOOL => {
1506            let b = buf.get_u8();
1507            Ok(Value::Bool(b != 0))
1508        }
1509        val::I8 => {
1510            let i = buf.get_i8() as i64;
1511            Ok(Value::Int(i))
1512        }
1513        val::I16 => {
1514            let i = buf.get_i16() as i64;
1515            Ok(Value::Int(i))
1516        }
1517        val::I32 => {
1518            let i = buf.get_i32() as i64;
1519            Ok(Value::Int(i))
1520        }
1521        val::I64 => {
1522            let i = buf.get_i64();
1523            Ok(Value::Int(i))
1524        }
1525        val::F32 => {
1526            let f = buf.get_f32() as f64;
1527            Ok(Value::Float(f))
1528        }
1529        val::F64 => {
1530            let f = buf.get_f64();
1531            Ok(Value::Float(f))
1532        }
1533        val::STRING => {
1534            let s = decode_string(buf)?;
1535            Ok(Value::String(s))
1536        }
1537        val::BYTES => {
1538            if buf.remaining() < 2 {
1539                return Err(Error::BufferTooSmall {
1540                    needed: 2,
1541                    have: buf.remaining(),
1542                });
1543            }
1544            let len = buf.get_u16() as usize;
1545            if buf.remaining() < len {
1546                return Err(Error::BufferTooSmall {
1547                    needed: len,
1548                    have: buf.remaining(),
1549                });
1550            }
1551            let bytes = buf[..len].to_vec();
1552            buf.advance(len);
1553            Ok(Value::Bytes(bytes))
1554        }
1555        val::ARRAY => {
1556            let count = buf.get_u16() as usize;
1557            let mut arr = Vec::with_capacity(count);
1558            for _ in 0..count {
1559                let item_type = buf.get_u8();
1560                arr.push(decode_value_data(buf, item_type)?);
1561            }
1562            Ok(Value::Array(arr))
1563        }
1564        val::MAP => {
1565            let count = buf.get_u16() as usize;
1566            let mut map = HashMap::with_capacity(count);
1567            for _ in 0..count {
1568                let key = decode_string(buf)?;
1569                let val_type = buf.get_u8();
1570                let val = decode_value_data(buf, val_type)?;
1571                map.insert(key, val);
1572            }
1573            Ok(Value::Map(map))
1574        }
1575        _ => Err(Error::DecodeError(format!(
1576            "unknown value type: 0x{:02x}",
1577            vtype
1578        ))),
1579    }
1580}
1581
1582fn signal_type_from_code(code: u8) -> SignalType {
1583    match code {
1584        sig::PARAM => SignalType::Param,
1585        sig::EVENT => SignalType::Event,
1586        sig::STREAM => SignalType::Stream,
1587        sig::GESTURE => SignalType::Gesture,
1588        sig::TIMELINE => SignalType::Timeline,
1589        _ => SignalType::Event, // Default
1590    }
1591}
1592
1593fn gesture_phase_from_code(code: u8) -> GesturePhase {
1594    match code {
1595        phase::START => GesturePhase::Start,
1596        phase::MOVE => GesturePhase::Move,
1597        phase::END => GesturePhase::End,
1598        phase::CANCEL => GesturePhase::Cancel,
1599        _ => GesturePhase::Start, // Default
1600    }
1601}
1602
1603// ============================================================================
1604// V2 MESSAGEPACK DECODING (BACKWARD COMPATIBILITY)
1605// ============================================================================
1606
1607fn is_msgpack_map(byte: u8) -> bool {
1608    // fixmap: 0x80-0x8F, map16: 0xDE, map32: 0xDF
1609    (byte & 0xF0) == 0x80 || byte == 0xDE || byte == 0xDF
1610}
1611
1612fn decode_v2_msgpack(bytes: &[u8]) -> Result<Message> {
1613    rmp_serde::from_slice(bytes).map_err(|e| Error::DecodeError(e.to_string()))
1614}
1615
1616// ============================================================================
1617// TESTS
1618// ============================================================================
1619
1620#[cfg(test)]
1621mod tests {
1622    use super::*;
1623
1624    #[test]
1625    fn test_hello_roundtrip() {
1626        let msg = Message::Hello(HelloMessage {
1627            version: 1,
1628            name: "Test Client".to_string(),
1629            features: vec!["param".to_string(), "event".to_string()],
1630            capabilities: None,
1631            token: None,
1632        });
1633
1634        let encoded = encode(&msg).unwrap();
1635        let (decoded, frame) = decode(&encoded).unwrap();
1636
1637        match decoded {
1638            Message::Hello(hello) => {
1639                assert_eq!(hello.version, 1);
1640                assert_eq!(hello.name, "Test Client");
1641                assert!(hello.features.contains(&"param".to_string()));
1642                assert!(hello.features.contains(&"event".to_string()));
1643            }
1644            _ => panic!("Expected Hello message"),
1645        }
1646
1647        assert_eq!(frame.flags.qos, QoS::Fire);
1648        assert_eq!(frame.flags.version, 1); // binary encoding
1649    }
1650
1651    #[test]
1652    fn test_set_roundtrip() {
1653        let msg = Message::Set(SetMessage {
1654            address: "/test/value".to_string(),
1655            value: Value::Float(0.75),
1656            revision: Some(42),
1657            lock: false,
1658            unlock: false,
1659        });
1660
1661        let encoded = encode(&msg).unwrap();
1662        let (decoded, frame) = decode(&encoded).unwrap();
1663
1664        match decoded {
1665            Message::Set(set) => {
1666                assert_eq!(set.address, "/test/value");
1667                assert_eq!(set.value.as_f64(), Some(0.75));
1668                assert_eq!(set.revision, Some(42));
1669            }
1670            _ => panic!("Expected Set message"),
1671        }
1672
1673        assert_eq!(frame.flags.qos, QoS::Confirm);
1674    }
1675
1676    #[test]
1677    fn test_set_size_reduction() {
1678        let msg = Message::Set(SetMessage {
1679            address: "/test/value".to_string(),
1680            value: Value::Float(0.5),
1681            revision: Some(1),
1682            lock: false,
1683            unlock: false,
1684        });
1685
1686        // Binary encoding
1687        let binary_payload = encode_message(&msg).unwrap();
1688
1689        // MessagePack encoding (named keys, legacy)
1690        let msgpack_payload = rmp_serde::to_vec_named(&msg).unwrap();
1691
1692        println!("Binary payload: {} bytes", binary_payload.len());
1693        println!("MessagePack payload: {} bytes", msgpack_payload.len());
1694
1695        // Binary encoding should be significantly smaller (target: ~32 bytes vs ~69 bytes)
1696        assert!(
1697            binary_payload.len() < msgpack_payload.len(),
1698            "Binary encoding ({}) should be smaller than MessagePack ({})",
1699            binary_payload.len(),
1700            msgpack_payload.len()
1701        );
1702
1703        // Binary encoding should be at least 40% smaller
1704        let savings = 100 - (binary_payload.len() * 100 / msgpack_payload.len());
1705        println!("Size reduction: {}%", savings);
1706        assert!(
1707            savings >= 40,
1708            "Expected at least 40% size reduction, got {}%",
1709            savings
1710        );
1711    }
1712
1713    #[test]
1714    fn test_bundle_roundtrip() {
1715        let msg = Message::Bundle(BundleMessage {
1716            timestamp: Some(1000000),
1717            messages: vec![
1718                Message::Set(SetMessage {
1719                    address: "/light/1".to_string(),
1720                    value: Value::Float(1.0),
1721                    revision: None,
1722                    lock: false,
1723                    unlock: false,
1724                }),
1725                Message::Set(SetMessage {
1726                    address: "/light/2".to_string(),
1727                    value: Value::Float(0.0),
1728                    revision: None,
1729                    lock: false,
1730                    unlock: false,
1731                }),
1732            ],
1733        });
1734
1735        let encoded = encode(&msg).unwrap();
1736        let (decoded, _) = decode(&encoded).unwrap();
1737
1738        match decoded {
1739            Message::Bundle(bundle) => {
1740                assert_eq!(bundle.timestamp, Some(1000000));
1741                assert_eq!(bundle.messages.len(), 2);
1742            }
1743            _ => panic!("Expected Bundle message"),
1744        }
1745    }
1746
1747    #[test]
1748    fn test_value_types() {
1749        let values = vec![
1750            Value::Null,
1751            Value::Bool(true),
1752            Value::Int(42),
1753            Value::Float(1.25),
1754            Value::String("hello".to_string()),
1755            Value::Array(vec![Value::Int(1), Value::Int(2), Value::Int(3)]),
1756        ];
1757
1758        for value in values {
1759            let msg = Message::Set(SetMessage {
1760                address: "/test".to_string(),
1761                value: value.clone(),
1762                revision: None,
1763                lock: false,
1764                unlock: false,
1765            });
1766
1767            let encoded = encode(&msg).unwrap();
1768            let (decoded, _) = decode(&encoded).unwrap();
1769
1770            match decoded {
1771                Message::Set(set) => {
1772                    assert_eq!(set.value, value);
1773                }
1774                _ => panic!("Expected Set message"),
1775            }
1776        }
1777    }
1778
1779    #[test]
1780    fn test_backward_compat_v2_decode() {
1781        // Create a v2 MessagePack encoded message
1782        let msg = SetMessage {
1783            address: "/test/value".to_string(),
1784            value: Value::Float(0.5),
1785            revision: Some(1),
1786            lock: false,
1787            unlock: false,
1788        };
1789
1790        // Encode as v2 (MessagePack with named keys)
1791        let v2_bytes = rmp_serde::to_vec_named(&Message::Set(msg.clone())).unwrap();
1792
1793        // Should still decode correctly
1794        let decoded = decode_message(&v2_bytes).unwrap();
1795
1796        match decoded {
1797            Message::Set(set) => {
1798                assert_eq!(set.address, "/test/value");
1799                assert_eq!(set.value.as_f64(), Some(0.5));
1800            }
1801            _ => panic!("Expected Set message"),
1802        }
1803    }
1804
1805    #[test]
1806    fn test_ping_pong() {
1807        let ping = encode(&Message::Ping).unwrap();
1808        let (decoded, _) = decode(&ping).unwrap();
1809        assert!(matches!(decoded, Message::Ping));
1810
1811        let pong = encode(&Message::Pong).unwrap();
1812        let (decoded, _) = decode(&pong).unwrap();
1813        assert!(matches!(decoded, Message::Pong));
1814    }
1815
1816    #[test]
1817    fn test_publish_event() {
1818        let msg = Message::Publish(PublishMessage {
1819            address: "/cue/fire".to_string(),
1820            signal: Some(SignalType::Event),
1821            value: None,
1822            payload: Some(Value::String("intro".to_string())),
1823            samples: None,
1824            rate: None,
1825            id: None,
1826            phase: None,
1827            timestamp: Some(1234567890),
1828            timeline: None,
1829        });
1830
1831        let encoded = encode(&msg).unwrap();
1832        let (decoded, _) = decode(&encoded).unwrap();
1833
1834        match decoded {
1835            Message::Publish(pub_msg) => {
1836                assert_eq!(pub_msg.address, "/cue/fire");
1837                assert_eq!(pub_msg.signal, Some(SignalType::Event));
1838                assert_eq!(pub_msg.timestamp, Some(1234567890));
1839            }
1840            _ => panic!("Expected Publish message"),
1841        }
1842    }
1843
1844    #[test]
1845    fn test_subscribe_roundtrip() {
1846        let msg = Message::Subscribe(SubscribeMessage {
1847            id: 42,
1848            pattern: "/lumen/scene/*/layer/**".to_string(),
1849            types: vec![SignalType::Param, SignalType::Stream],
1850            options: Some(SubscribeOptions {
1851                max_rate: Some(60),
1852                epsilon: Some(0.01),
1853                history: None,
1854                window: None,
1855            }),
1856        });
1857
1858        let encoded = encode(&msg).unwrap();
1859        let (decoded, _) = decode(&encoded).unwrap();
1860
1861        match decoded {
1862            Message::Subscribe(sub) => {
1863                assert_eq!(sub.id, 42);
1864                assert_eq!(sub.pattern, "/lumen/scene/*/layer/**");
1865                assert!(sub.types.contains(&SignalType::Param));
1866                assert!(sub.types.contains(&SignalType::Stream));
1867                assert_eq!(sub.options.as_ref().unwrap().max_rate, Some(60));
1868            }
1869            _ => panic!("Expected Subscribe message"),
1870        }
1871    }
1872}