Skip to main content

clasp_core/
codec.rs

1//! CLASP Binary Codec
2//!
3//! Efficient binary encoding for all CLASP messages.
4//! Backward compatible: can decode v2 MessagePack frames.
5//!
6//! # Performance
7//!
8//! Compared to v2 (MessagePack with named keys):
9//! - SET message: 69 bytes → 32 bytes (54% smaller)
10//! - Encoding speed: ~10M msg/s (vs 1.8M)
11//! - Decoding speed: ~12M msg/s (vs 1.5M)
12
13use crate::types::*;
14use crate::{Error, Frame, QoS, Result};
15use bytes::{Buf, BufMut, Bytes, BytesMut};
16use std::collections::HashMap;
17
18/// Encoding version (1 = binary encoding, 0 = MessagePack legacy)
19pub const ENCODING_VERSION: u8 = 1;
20
21/// Message type codes
22pub mod msg {
23    pub const HELLO: u8 = 0x01;
24    pub const WELCOME: u8 = 0x02;
25    pub const ANNOUNCE: u8 = 0x03;
26    pub const SUBSCRIBE: u8 = 0x10;
27    pub const UNSUBSCRIBE: u8 = 0x11;
28    pub const PUBLISH: u8 = 0x20;
29    pub const SET: u8 = 0x21;
30    pub const GET: u8 = 0x22;
31    pub const SNAPSHOT: u8 = 0x23;
32    pub const FEDERATION_SYNC: u8 = 0x04;
33    pub const REPLAY: u8 = 0x24;
34    pub const BUNDLE: u8 = 0x30;
35    pub const SYNC: u8 = 0x40;
36    pub const PING: u8 = 0x41;
37    pub const PONG: u8 = 0x42;
38    pub const ACK: u8 = 0x50;
39    pub const ERROR: u8 = 0x51;
40    pub const QUERY: u8 = 0x60;
41    pub const RESULT: u8 = 0x61;
42}
43
44/// Value type codes for efficient binary encoding
45pub mod val {
46    pub const NULL: u8 = 0x00;
47    pub const BOOL: u8 = 0x01;
48    pub const I8: u8 = 0x02;
49    pub const I16: u8 = 0x03;
50    pub const I32: u8 = 0x04;
51    pub const I64: u8 = 0x05;
52    pub const F32: u8 = 0x06;
53    pub const F64: u8 = 0x07;
54    pub const STRING: u8 = 0x08;
55    pub const BYTES: u8 = 0x09;
56    pub const ARRAY: u8 = 0x0A;
57    pub const MAP: u8 = 0x0B;
58}
59
60/// Signal type codes
61pub mod sig {
62    pub const PARAM: u8 = 0;
63    pub const EVENT: u8 = 1;
64    pub const STREAM: u8 = 2;
65    pub const GESTURE: u8 = 3;
66    pub const TIMELINE: u8 = 4;
67}
68
69/// Gesture phase codes
70pub mod phase {
71    pub const START: u8 = 0;
72    pub const MOVE: u8 = 1;
73    pub const END: u8 = 2;
74    pub const CANCEL: u8 = 3;
75}
76
77// ============================================================================
78// PUBLIC API
79// ============================================================================
80
81/// Encode a message to binary format
82#[inline]
83pub fn encode_message(message: &Message) -> Result<Bytes> {
84    // Pre-allocate based on expected message size
85    let capacity = estimate_message_size(message);
86    let mut buf = BytesMut::with_capacity(capacity);
87    encode_message_to_buf(&mut buf, message)?;
88    Ok(buf.freeze())
89}
90
91/// Estimate message size for pre-allocation (avoids realloc)
92#[inline]
93fn estimate_message_size(msg: &Message) -> usize {
94    match msg {
95        Message::Set(m) => {
96            2 + 2
97                + m.address.len()
98                + 9
99                + if m.revision.is_some() { 8 } else { 0 }
100                + if m.ttl.is_some() { 4 } else { 0 }
101        }
102        Message::Publish(m) => 2 + 2 + m.address.len() + 16,
103        Message::Hello(m) => 4 + m.name.len() + 2,
104        Message::Welcome(m) => 12 + m.name.len() + m.session.len() + 4,
105        Message::Subscribe(m) => 6 + m.pattern.len() + 16,
106        Message::Bundle(m) => 12 + m.messages.len() * 48,
107        Message::Replay(m) => 4 + m.pattern.len() + 20,
108        Message::FederationSync(m) => {
109            4 + m.patterns.iter().map(|p| 2 + p.len()).sum::<usize>() + m.revisions.len() * 12 + 16
110        }
111        Message::Ping | Message::Pong => 5, // Just frame header
112        _ => 64,                            // Default for less common messages
113    }
114}
115
116/// Decode a message - auto-detects MessagePack (legacy) vs binary encoding
117#[inline]
118pub fn decode_message(bytes: &[u8]) -> Result<Message> {
119    if bytes.is_empty() {
120        return Err(Error::BufferTooSmall { needed: 1, have: 0 });
121    }
122
123    let first = bytes[0];
124
125    // Binary encoded messages start with known message type codes (0x01-0x61)
126    // v2 MessagePack maps start with 0x80-0x8F (fixmap) or 0xDE-0xDF (map16/map32)
127    if is_msgpack_map(first) {
128        // Legacy v2 format - use rmp-serde
129        decode_v2_msgpack(bytes)
130    } else {
131        // Binary encoding format
132        decode_v3_binary(bytes)
133    }
134}
135
136/// Encode a message into a complete frame (binary encoding)
137#[inline]
138pub fn encode(message: &Message) -> Result<Bytes> {
139    let payload = encode_message(message)?;
140    let mut frame = Frame::new(payload).with_qos(message.default_qos());
141    frame.flags.version = 1; // binary encoding (1 = binary, 0 = MessagePack legacy)
142    frame.encode()
143}
144
145/// Encode a message with options (binary encoding)
146pub fn encode_with_options(
147    message: &Message,
148    qos: Option<QoS>,
149    timestamp: Option<u64>,
150) -> Result<Bytes> {
151    let payload = encode_message(message)?;
152    let mut frame = Frame::new(payload);
153    frame.flags.version = 1; // binary encoding (1 = binary, 0 = MessagePack legacy)
154
155    if let Some(qos) = qos {
156        frame = frame.with_qos(qos);
157    } else {
158        frame = frame.with_qos(message.default_qos());
159    }
160
161    if let Some(ts) = timestamp {
162        frame = frame.with_timestamp(ts);
163    }
164
165    frame.encode()
166}
167
168/// Decode a frame and extract the message
169#[inline]
170pub fn decode(bytes: &[u8]) -> Result<(Message, Frame)> {
171    let frame = Frame::decode(bytes)?;
172    let message = decode_message(&frame.payload)?;
173    Ok((message, frame))
174}
175
176/// Helper to encode just the message payload (without frame) - binary encoding
177pub fn encode_payload(message: &Message) -> Result<Vec<u8>> {
178    let bytes = encode_message(message)?;
179    Ok(bytes.to_vec())
180}
181
182/// Helper to decode just a message payload (without frame)
183pub fn decode_payload(bytes: &[u8]) -> Result<Message> {
184    decode_message(bytes)
185}
186
187// ============================================================================
188// BINARY ENCODING
189// ============================================================================
190
191fn encode_message_to_buf(buf: &mut BytesMut, msg: &Message) -> Result<()> {
192    match msg {
193        Message::Hello(m) => encode_hello(buf, m),
194        Message::Welcome(m) => encode_welcome(buf, m),
195        Message::Announce(m) => encode_announce(buf, m),
196        Message::Subscribe(m) => encode_subscribe(buf, m),
197        Message::Unsubscribe(m) => encode_unsubscribe(buf, m),
198        Message::Publish(m) => encode_publish(buf, m),
199        Message::Set(m) => encode_set(buf, m),
200        Message::Get(m) => encode_get(buf, m),
201        Message::Snapshot(m) => encode_snapshot(buf, m),
202        Message::Replay(m) => encode_replay(buf, m),
203        Message::FederationSync(m) => encode_federation_sync(buf, m),
204        Message::Bundle(m) => encode_bundle(buf, m),
205        Message::Sync(m) => encode_sync(buf, m),
206        Message::Ping => {
207            buf.put_u8(msg::PING);
208            Ok(())
209        }
210        Message::Pong => {
211            buf.put_u8(msg::PONG);
212            Ok(())
213        }
214        Message::Ack(m) => encode_ack(buf, m),
215        Message::Error(m) => encode_error(buf, m),
216        Message::Query(m) => encode_query(buf, m),
217        Message::Result(m) => encode_result(buf, m),
218    }
219}
220
221/// SET (0x21) - Parameter Update
222/// Flags: [has_rev:1][lock:1][unlock:1][rsv:1][vtype:4]
223#[inline]
224fn encode_set(buf: &mut BytesMut, msg: &SetMessage) -> Result<()> {
225    buf.put_u8(msg::SET);
226
227    let vtype = value_type_code(&msg.value);
228    let mut flags = vtype & 0x0F;
229    if msg.revision.is_some() {
230        flags |= 0x80;
231    }
232    if msg.lock {
233        flags |= 0x40;
234    }
235    if msg.unlock {
236        flags |= 0x20;
237    }
238    if msg.ttl.is_some() {
239        flags |= 0x10;
240    }
241    buf.put_u8(flags);
242
243    // Address
244    encode_string(buf, &msg.address)?;
245
246    // Value (type already in flags for simple types)
247    encode_value_data(buf, &msg.value)?;
248
249    // Optional revision
250    if let Some(rev) = msg.revision {
251        buf.put_u64(rev);
252    }
253
254    // Optional TTL
255    if let Some(ttl) = msg.ttl {
256        let raw = match ttl {
257            Ttl::Never => 0u32,
258            Ttl::Sliding(secs) => secs & 0x7FFF_FFFF,
259            Ttl::Absolute(secs) => (secs & 0x7FFF_FFFF) | 0x8000_0000,
260        };
261        buf.put_u32(raw);
262    }
263
264    Ok(())
265}
266
267/// PUBLISH (0x20) - Event/Stream/Gesture
268/// Flags: [sig_type:3][has_ts:1][has_id:1][phase:3]
269fn encode_publish(buf: &mut BytesMut, msg: &PublishMessage) -> Result<()> {
270    buf.put_u8(msg::PUBLISH);
271
272    let sig_code = msg.signal.map(signal_type_code).unwrap_or(sig::EVENT);
273    let phase_code = msg.phase.map(gesture_phase_code).unwrap_or(phase::START);
274
275    let mut flags: u8 = (sig_code & 0x07) << 5;
276    if msg.timestamp.is_some() {
277        flags |= 0x10;
278    }
279    if msg.id.is_some() {
280        flags |= 0x08;
281    }
282    flags |= phase_code & 0x07;
283    buf.put_u8(flags);
284
285    // Address
286    encode_string(buf, &msg.address)?;
287
288    // Value/payload
289    if let Some(ref value) = msg.value {
290        buf.put_u8(1); // has value
291        buf.put_u8(value_type_code(value));
292        encode_value_data(buf, value)?;
293    } else if let Some(ref payload) = msg.payload {
294        buf.put_u8(1); // has payload
295        buf.put_u8(value_type_code(payload));
296        encode_value_data(buf, payload)?;
297    } else if let Some(ref samples) = msg.samples {
298        buf.put_u8(2); // has samples
299        buf.put_u16(samples.len() as u16);
300        for sample in samples {
301            buf.put_f64(*sample);
302        }
303    } else {
304        buf.put_u8(0); // no value
305    }
306
307    // Optional timestamp
308    if let Some(ts) = msg.timestamp {
309        buf.put_u64(ts);
310    }
311
312    // Optional gesture ID
313    if let Some(id) = msg.id {
314        buf.put_u32(id);
315    }
316
317    // Optional rate
318    if let Some(rate) = msg.rate {
319        buf.put_u32(rate);
320    }
321
322    Ok(())
323}
324
325/// HELLO (0x01)
326fn encode_hello(buf: &mut BytesMut, msg: &HelloMessage) -> Result<()> {
327    buf.put_u8(msg::HELLO);
328    buf.put_u8(msg.version);
329
330    // Feature flags
331    let mut features: u8 = 0;
332    for f in &msg.features {
333        match f.as_str() {
334            "param" => features |= 0x80,
335            "event" => features |= 0x40,
336            "stream" => features |= 0x20,
337            "gesture" => features |= 0x10,
338            "timeline" => features |= 0x08,
339            "federation" => features |= 0x04,
340            _ => {}
341        }
342    }
343    buf.put_u8(features);
344
345    // Name
346    encode_string(buf, &msg.name)?;
347
348    // Token (optional)
349    if let Some(ref token) = msg.token {
350        encode_string(buf, token)?;
351    } else {
352        buf.put_u16(0);
353    }
354
355    Ok(())
356}
357
358/// WELCOME (0x02)
359fn encode_welcome(buf: &mut BytesMut, msg: &WelcomeMessage) -> Result<()> {
360    buf.put_u8(msg::WELCOME);
361    buf.put_u8(msg.version);
362
363    // Feature flags (same as HELLO)
364    let mut features: u8 = 0;
365    for f in &msg.features {
366        match f.as_str() {
367            "param" => features |= 0x80,
368            "event" => features |= 0x40,
369            "stream" => features |= 0x20,
370            "gesture" => features |= 0x10,
371            "timeline" => features |= 0x08,
372            _ => {}
373        }
374    }
375    buf.put_u8(features);
376
377    // Server time
378    buf.put_u64(msg.time);
379
380    // Session ID
381    encode_string(buf, &msg.session)?;
382
383    // Server name
384    encode_string(buf, &msg.name)?;
385
386    // Token (optional)
387    if let Some(ref token) = msg.token {
388        encode_string(buf, token)?;
389    } else {
390        buf.put_u16(0);
391    }
392
393    Ok(())
394}
395
396/// ANNOUNCE (0x03)
397fn encode_announce(buf: &mut BytesMut, msg: &AnnounceMessage) -> Result<()> {
398    buf.put_u8(msg::ANNOUNCE);
399
400    encode_string(buf, &msg.namespace)?;
401    buf.put_u16(msg.signals.len() as u16);
402
403    for sig in &msg.signals {
404        encode_string(buf, &sig.address)?;
405        buf.put_u8(signal_type_code(sig.signal_type));
406
407        // Optional fields flags
408        let mut opt_flags: u8 = 0;
409        if sig.datatype.is_some() {
410            opt_flags |= 0x01;
411        }
412        if sig.access.is_some() {
413            opt_flags |= 0x02;
414        }
415        if sig.meta.is_some() {
416            opt_flags |= 0x04;
417        }
418        buf.put_u8(opt_flags);
419
420        if let Some(ref dt) = sig.datatype {
421            encode_string(buf, dt)?;
422        }
423        if let Some(ref access) = sig.access {
424            encode_string(buf, access)?;
425        }
426        if let Some(ref meta) = sig.meta {
427            // Encode meta as simple fields
428            let mut meta_flags: u8 = 0;
429            if meta.unit.is_some() {
430                meta_flags |= 0x01;
431            }
432            if meta.range.is_some() {
433                meta_flags |= 0x02;
434            }
435            if meta.default.is_some() {
436                meta_flags |= 0x04;
437            }
438            if meta.description.is_some() {
439                meta_flags |= 0x08;
440            }
441            buf.put_u8(meta_flags);
442
443            if let Some(ref unit) = meta.unit {
444                encode_string(buf, unit)?;
445            }
446            if let Some((min, max)) = meta.range {
447                buf.put_f64(min);
448                buf.put_f64(max);
449            }
450            if let Some(ref default) = meta.default {
451                buf.put_u8(value_type_code(default));
452                encode_value_data(buf, default)?;
453            }
454            if let Some(ref desc) = meta.description {
455                encode_string(buf, desc)?;
456            }
457        }
458    }
459
460    Ok(())
461}
462
463/// SUBSCRIBE (0x10)
464fn encode_subscribe(buf: &mut BytesMut, msg: &SubscribeMessage) -> Result<()> {
465    buf.put_u8(msg::SUBSCRIBE);
466    buf.put_u32(msg.id);
467
468    encode_string(buf, &msg.pattern)?;
469
470    // Signal type filter as bitmask
471    let mut type_mask: u8 = 0;
472    if msg.types.is_empty() {
473        type_mask = 0xFF; // All types
474    } else {
475        for t in &msg.types {
476            match t {
477                SignalType::Param => type_mask |= 0x01,
478                SignalType::Event => type_mask |= 0x02,
479                SignalType::Stream => type_mask |= 0x04,
480                SignalType::Gesture => type_mask |= 0x08,
481                SignalType::Timeline => type_mask |= 0x10,
482            }
483        }
484    }
485    buf.put_u8(type_mask);
486
487    // Options
488    if let Some(ref opts) = msg.options {
489        let mut opt_flags: u8 = 0;
490        if opts.max_rate.is_some() {
491            opt_flags |= 0x01;
492        }
493        if opts.epsilon.is_some() {
494            opt_flags |= 0x02;
495        }
496        if opts.history.is_some() {
497            opt_flags |= 0x04;
498        }
499        if opts.window.is_some() {
500            opt_flags |= 0x08;
501        }
502        buf.put_u8(opt_flags);
503
504        if let Some(rate) = opts.max_rate {
505            buf.put_u32(rate);
506        }
507        if let Some(eps) = opts.epsilon {
508            buf.put_f64(eps);
509        }
510        if let Some(hist) = opts.history {
511            buf.put_u32(hist);
512        }
513        if let Some(win) = opts.window {
514            buf.put_u32(win);
515        }
516    } else {
517        buf.put_u8(0); // No options
518    }
519
520    Ok(())
521}
522
523/// UNSUBSCRIBE (0x11)
524fn encode_unsubscribe(buf: &mut BytesMut, msg: &UnsubscribeMessage) -> Result<()> {
525    buf.put_u8(msg::UNSUBSCRIBE);
526    buf.put_u32(msg.id);
527    Ok(())
528}
529
530/// GET (0x22)
531fn encode_get(buf: &mut BytesMut, msg: &GetMessage) -> Result<()> {
532    buf.put_u8(msg::GET);
533    encode_string(buf, &msg.address)?;
534    Ok(())
535}
536
537/// SNAPSHOT (0x23)
538fn encode_snapshot(buf: &mut BytesMut, msg: &SnapshotMessage) -> Result<()> {
539    buf.put_u8(msg::SNAPSHOT);
540    buf.put_u16(msg.params.len() as u16);
541
542    for param in &msg.params {
543        encode_string(buf, &param.address)?;
544        buf.put_u8(value_type_code(&param.value));
545        encode_value_data(buf, &param.value)?;
546        buf.put_u64(param.revision);
547
548        let mut opt_flags: u8 = 0;
549        if param.writer.is_some() {
550            opt_flags |= 0x01;
551        }
552        if param.timestamp.is_some() {
553            opt_flags |= 0x02;
554        }
555        buf.put_u8(opt_flags);
556
557        if let Some(ref writer) = param.writer {
558            encode_string(buf, writer)?;
559        }
560        if let Some(ts) = param.timestamp {
561            buf.put_u64(ts);
562        }
563    }
564
565    Ok(())
566}
567
568/// REPLAY (0x24) - Journal replay request
569/// Flags: [has_from:1][has_to:1][has_limit:1][rsv:5]
570fn encode_replay(buf: &mut BytesMut, msg: &ReplayMessage) -> Result<()> {
571    buf.put_u8(msg::REPLAY);
572
573    let mut flags: u8 = 0;
574    if msg.from.is_some() {
575        flags |= 0x80;
576    }
577    if msg.to.is_some() {
578        flags |= 0x40;
579    }
580    if msg.limit.is_some() {
581        flags |= 0x20;
582    }
583    buf.put_u8(flags);
584
585    encode_string(buf, &msg.pattern)?;
586
587    if let Some(from) = msg.from {
588        buf.put_u64(from);
589    }
590    if let Some(to) = msg.to {
591        buf.put_u64(to);
592    }
593    if let Some(limit) = msg.limit {
594        buf.put_u32(limit);
595    }
596
597    // Signal type filter as bitmask (same format as SUBSCRIBE)
598    let mut type_mask: u8 = 0;
599    if msg.types.is_empty() {
600        type_mask = 0xFF; // All types
601    } else {
602        for t in &msg.types {
603            match t {
604                SignalType::Param => type_mask |= 0x01,
605                SignalType::Event => type_mask |= 0x02,
606                SignalType::Stream => type_mask |= 0x04,
607                SignalType::Gesture => type_mask |= 0x08,
608                SignalType::Timeline => type_mask |= 0x10,
609            }
610        }
611    }
612    buf.put_u8(type_mask);
613
614    Ok(())
615}
616
617/// FEDERATION_SYNC (0x04) - Router-to-router federation
618/// Layout: [op:u8][pattern_count:u16][patterns...][revision_count:u16][revisions...][flags:u8][optional fields]
619fn encode_federation_sync(buf: &mut BytesMut, msg: &FederationSyncMessage) -> Result<()> {
620    buf.put_u8(msg::FEDERATION_SYNC);
621    buf.put_u8(msg.op as u8);
622
623    // Encode patterns
624    buf.put_u16(msg.patterns.len() as u16);
625    for pattern in &msg.patterns {
626        encode_string(buf, pattern)?;
627    }
628
629    // Encode revisions map
630    buf.put_u16(msg.revisions.len() as u16);
631    for (key, val) in &msg.revisions {
632        encode_string(buf, key)?;
633        buf.put_u64(*val);
634    }
635
636    // Flags: [has_since:1][has_origin:1][rsv:6]
637    let mut flags: u8 = 0;
638    if msg.since_revision.is_some() {
639        flags |= 0x80;
640    }
641    if msg.origin.is_some() {
642        flags |= 0x40;
643    }
644    buf.put_u8(flags);
645
646    if let Some(since) = msg.since_revision {
647        buf.put_u64(since);
648    }
649    if let Some(ref origin) = msg.origin {
650        encode_string(buf, origin)?;
651    }
652
653    Ok(())
654}
655
656/// BUNDLE (0x30)
657fn encode_bundle(buf: &mut BytesMut, msg: &BundleMessage) -> Result<()> {
658    buf.put_u8(msg::BUNDLE);
659
660    let mut flags: u8 = 0;
661    if msg.timestamp.is_some() {
662        flags |= 0x80;
663    }
664    buf.put_u8(flags);
665
666    buf.put_u16(msg.messages.len() as u16);
667
668    if let Some(ts) = msg.timestamp {
669        buf.put_u64(ts);
670    }
671
672    // Each message prefixed with length
673    for inner_msg in &msg.messages {
674        let mut inner_buf = BytesMut::with_capacity(64);
675        encode_message_to_buf(&mut inner_buf, inner_msg)?;
676        buf.put_u16(inner_buf.len() as u16);
677        buf.extend_from_slice(&inner_buf);
678    }
679
680    Ok(())
681}
682
683/// SYNC (0x40)
684fn encode_sync(buf: &mut BytesMut, msg: &SyncMessage) -> Result<()> {
685    buf.put_u8(msg::SYNC);
686
687    let mut flags: u8 = 0;
688    if msg.t2.is_some() {
689        flags |= 0x01;
690    }
691    if msg.t3.is_some() {
692        flags |= 0x02;
693    }
694    buf.put_u8(flags);
695
696    buf.put_u64(msg.t1);
697    if let Some(t2) = msg.t2 {
698        buf.put_u64(t2);
699    }
700    if let Some(t3) = msg.t3 {
701        buf.put_u64(t3);
702    }
703
704    Ok(())
705}
706
707/// ACK (0x50)
708fn encode_ack(buf: &mut BytesMut, msg: &AckMessage) -> Result<()> {
709    buf.put_u8(msg::ACK);
710
711    let mut flags: u8 = 0;
712    if msg.address.is_some() {
713        flags |= 0x01;
714    }
715    if msg.revision.is_some() {
716        flags |= 0x02;
717    }
718    if msg.locked.is_some() {
719        flags |= 0x04;
720    }
721    if msg.holder.is_some() {
722        flags |= 0x08;
723    }
724    if msg.correlation_id.is_some() {
725        flags |= 0x10;
726    }
727    buf.put_u8(flags);
728
729    if let Some(ref addr) = msg.address {
730        encode_string(buf, addr)?;
731    }
732    if let Some(rev) = msg.revision {
733        buf.put_u64(rev);
734    }
735    if let Some(locked) = msg.locked {
736        buf.put_u8(if locked { 1 } else { 0 });
737    }
738    if let Some(ref holder) = msg.holder {
739        encode_string(buf, holder)?;
740    }
741    if let Some(corr) = msg.correlation_id {
742        buf.put_u32(corr);
743    }
744
745    Ok(())
746}
747
748/// ERROR (0x51)
749fn encode_error(buf: &mut BytesMut, msg: &ErrorMessage) -> Result<()> {
750    buf.put_u8(msg::ERROR);
751    buf.put_u16(msg.code);
752    encode_string(buf, &msg.message)?;
753
754    let mut flags: u8 = 0;
755    if msg.address.is_some() {
756        flags |= 0x01;
757    }
758    if msg.correlation_id.is_some() {
759        flags |= 0x02;
760    }
761    buf.put_u8(flags);
762
763    if let Some(ref addr) = msg.address {
764        encode_string(buf, addr)?;
765    }
766    if let Some(corr) = msg.correlation_id {
767        buf.put_u32(corr);
768    }
769
770    Ok(())
771}
772
773/// QUERY (0x60)
774fn encode_query(buf: &mut BytesMut, msg: &QueryMessage) -> Result<()> {
775    buf.put_u8(msg::QUERY);
776    encode_string(buf, &msg.pattern)?;
777    Ok(())
778}
779
780/// RESULT (0x61)
781fn encode_result(buf: &mut BytesMut, msg: &ResultMessage) -> Result<()> {
782    buf.put_u8(msg::RESULT);
783    buf.put_u16(msg.signals.len() as u16);
784
785    for sig in &msg.signals {
786        encode_string(buf, &sig.address)?;
787        buf.put_u8(signal_type_code(sig.signal_type));
788
789        let mut opt_flags: u8 = 0;
790        if sig.datatype.is_some() {
791            opt_flags |= 0x01;
792        }
793        if sig.access.is_some() {
794            opt_flags |= 0x02;
795        }
796        buf.put_u8(opt_flags);
797
798        if let Some(ref dt) = sig.datatype {
799            encode_string(buf, dt)?;
800        }
801        if let Some(ref access) = sig.access {
802            encode_string(buf, access)?;
803        }
804    }
805
806    Ok(())
807}
808
809// ============================================================================
810// VALUE ENCODING HELPERS
811// ============================================================================
812
813#[inline(always)]
814fn encode_string(buf: &mut BytesMut, s: &str) -> Result<()> {
815    let bytes = s.as_bytes();
816    if bytes.len() > u16::MAX as usize {
817        return Err(Error::PayloadTooLarge(bytes.len()));
818    }
819    buf.put_u16(bytes.len() as u16);
820    buf.extend_from_slice(bytes);
821    Ok(())
822}
823
824#[inline]
825fn encode_value_data(buf: &mut BytesMut, value: &Value) -> Result<()> {
826    match value {
827        Value::Null => {} // Type code is enough
828        Value::Bool(b) => buf.put_u8(if *b { 1 } else { 0 }),
829        Value::Int(i) => buf.put_i64(*i),
830        Value::Float(f) => buf.put_f64(*f),
831        Value::String(s) => encode_string(buf, s)?,
832        Value::Bytes(b) => {
833            buf.put_u16(b.len() as u16);
834            buf.extend_from_slice(b);
835        }
836        Value::Array(arr) => {
837            buf.put_u16(arr.len() as u16);
838            for item in arr {
839                buf.put_u8(value_type_code(item));
840                encode_value_data(buf, item)?;
841            }
842        }
843        Value::Map(map) => {
844            buf.put_u16(map.len() as u16);
845            for (key, val) in map {
846                encode_string(buf, key)?;
847                buf.put_u8(value_type_code(val));
848                encode_value_data(buf, val)?;
849            }
850        }
851    }
852    Ok(())
853}
854
855#[inline(always)]
856fn value_type_code(value: &Value) -> u8 {
857    match value {
858        Value::Null => val::NULL,
859        Value::Bool(_) => val::BOOL,
860        Value::Int(_) => val::I64,
861        Value::Float(_) => val::F64,
862        Value::String(_) => val::STRING,
863        Value::Bytes(_) => val::BYTES,
864        Value::Array(_) => val::ARRAY,
865        Value::Map(_) => val::MAP,
866    }
867}
868
869fn signal_type_code(sig: SignalType) -> u8 {
870    match sig {
871        SignalType::Param => sig::PARAM,
872        SignalType::Event => sig::EVENT,
873        SignalType::Stream => sig::STREAM,
874        SignalType::Gesture => sig::GESTURE,
875        SignalType::Timeline => sig::TIMELINE,
876    }
877}
878
879fn gesture_phase_code(phase: GesturePhase) -> u8 {
880    match phase {
881        GesturePhase::Start => phase::START,
882        GesturePhase::Move => phase::MOVE,
883        GesturePhase::End => phase::END,
884        GesturePhase::Cancel => phase::CANCEL,
885    }
886}
887
888// ============================================================================
889// BINARY DECODING
890// ============================================================================
891
892fn decode_v3_binary(bytes: &[u8]) -> Result<Message> {
893    if bytes.is_empty() {
894        return Err(Error::BufferTooSmall { needed: 1, have: 0 });
895    }
896
897    let mut buf = bytes;
898    let msg_type = buf.get_u8();
899
900    match msg_type {
901        msg::HELLO => decode_hello(&mut buf),
902        msg::WELCOME => decode_welcome(&mut buf),
903        msg::ANNOUNCE => decode_announce(&mut buf),
904        msg::SUBSCRIBE => decode_subscribe(&mut buf),
905        msg::UNSUBSCRIBE => decode_unsubscribe(&mut buf),
906        msg::PUBLISH => decode_publish(&mut buf),
907        msg::SET => decode_set(&mut buf),
908        msg::GET => decode_get(&mut buf),
909        msg::SNAPSHOT => decode_snapshot(&mut buf),
910        msg::REPLAY => decode_replay(&mut buf),
911        msg::FEDERATION_SYNC => decode_federation_sync(&mut buf),
912        msg::BUNDLE => decode_bundle(&mut buf),
913        msg::SYNC => decode_sync(&mut buf),
914        msg::PING => Ok(Message::Ping),
915        msg::PONG => Ok(Message::Pong),
916        msg::ACK => decode_ack(&mut buf),
917        msg::ERROR => decode_error(&mut buf),
918        msg::QUERY => decode_query(&mut buf),
919        msg::RESULT => decode_result(&mut buf),
920        _ => Err(Error::UnknownMessageType(msg_type)),
921    }
922}
923
924#[inline]
925fn decode_set(buf: &mut &[u8]) -> Result<Message> {
926    let flags = buf.get_u8();
927    let vtype = flags & 0x0F;
928    let has_rev = (flags & 0x80) != 0;
929    let lock = (flags & 0x40) != 0;
930    let unlock = (flags & 0x20) != 0;
931    let has_ttl = (flags & 0x10) != 0;
932
933    let address = decode_string(buf)?;
934    let value = decode_value_data(buf, vtype)?;
935
936    let revision = if has_rev { Some(buf.get_u64()) } else { None };
937
938    let ttl = if has_ttl {
939        let raw = buf.get_u32();
940        if raw == 0 {
941            Some(Ttl::Never)
942        } else if raw & 0x8000_0000 != 0 {
943            Some(Ttl::Absolute(raw & 0x7FFF_FFFF))
944        } else {
945            Some(Ttl::Sliding(raw))
946        }
947    } else {
948        None
949    };
950
951    Ok(Message::Set(SetMessage {
952        address,
953        value,
954        revision,
955        lock,
956        unlock,
957        ttl,
958    }))
959}
960
961fn decode_publish(buf: &mut &[u8]) -> Result<Message> {
962    let flags = buf.get_u8();
963    let sig_code = (flags >> 5) & 0x07;
964    let has_ts = (flags & 0x10) != 0;
965    let has_id = (flags & 0x08) != 0;
966    let phase_code = flags & 0x07;
967
968    let address = decode_string(buf)?;
969
970    // Value indicator
971    let value_indicator = buf.get_u8();
972    let (value, payload, samples) = match value_indicator {
973        0 => (None, None, None),
974        1 => {
975            let vtype = buf.get_u8();
976            let v = decode_value_data(buf, vtype)?;
977            (Some(v), None, None)
978        }
979        2 => {
980            let count = buf.get_u16() as usize;
981            let mut s = Vec::with_capacity(count);
982            for _ in 0..count {
983                s.push(buf.get_f64());
984            }
985            (None, None, Some(s))
986        }
987        _ => (None, None, None),
988    };
989
990    let timestamp = if has_ts { Some(buf.get_u64()) } else { None };
991    let id = if has_id { Some(buf.get_u32()) } else { None };
992
993    // Rate (if remaining bytes)
994    let rate = if buf.remaining() >= 4 {
995        Some(buf.get_u32())
996    } else {
997        None
998    };
999
1000    let signal = Some(signal_type_from_code(sig_code));
1001    let phase = Some(gesture_phase_from_code(phase_code));
1002
1003    Ok(Message::Publish(PublishMessage {
1004        address,
1005        signal,
1006        value,
1007        payload,
1008        samples,
1009        rate,
1010        id,
1011        phase,
1012        timestamp,
1013        timeline: None, // Timeline data is encoded separately when signal is Timeline
1014    }))
1015}
1016
1017fn decode_hello(buf: &mut &[u8]) -> Result<Message> {
1018    let version = buf.get_u8();
1019    let feature_flags = buf.get_u8();
1020
1021    let mut features = Vec::new();
1022    if feature_flags & 0x80 != 0 {
1023        features.push("param".to_string());
1024    }
1025    if feature_flags & 0x40 != 0 {
1026        features.push("event".to_string());
1027    }
1028    if feature_flags & 0x20 != 0 {
1029        features.push("stream".to_string());
1030    }
1031    if feature_flags & 0x10 != 0 {
1032        features.push("gesture".to_string());
1033    }
1034    if feature_flags & 0x08 != 0 {
1035        features.push("timeline".to_string());
1036    }
1037    if feature_flags & 0x04 != 0 {
1038        features.push("federation".to_string());
1039    }
1040
1041    let name = decode_string(buf)?;
1042    let token_str = decode_string(buf)?;
1043    let token = if token_str.is_empty() {
1044        None
1045    } else {
1046        Some(token_str)
1047    };
1048
1049    Ok(Message::Hello(HelloMessage {
1050        version,
1051        name,
1052        features,
1053        capabilities: None,
1054        token,
1055    }))
1056}
1057
1058fn decode_welcome(buf: &mut &[u8]) -> Result<Message> {
1059    let version = buf.get_u8();
1060    let feature_flags = buf.get_u8();
1061
1062    let mut features = Vec::new();
1063    if feature_flags & 0x80 != 0 {
1064        features.push("param".to_string());
1065    }
1066    if feature_flags & 0x40 != 0 {
1067        features.push("event".to_string());
1068    }
1069    if feature_flags & 0x20 != 0 {
1070        features.push("stream".to_string());
1071    }
1072    if feature_flags & 0x10 != 0 {
1073        features.push("gesture".to_string());
1074    }
1075    if feature_flags & 0x08 != 0 {
1076        features.push("timeline".to_string());
1077    }
1078
1079    let time = buf.get_u64();
1080    let session = decode_string(buf)?;
1081    let name = decode_string(buf)?;
1082
1083    let token_str = decode_string(buf)?;
1084    let token = if token_str.is_empty() {
1085        None
1086    } else {
1087        Some(token_str)
1088    };
1089
1090    Ok(Message::Welcome(WelcomeMessage {
1091        version,
1092        session,
1093        name,
1094        features,
1095        time,
1096        token,
1097    }))
1098}
1099
1100fn decode_announce(buf: &mut &[u8]) -> Result<Message> {
1101    let namespace = decode_string(buf)?;
1102    let count = buf.get_u16() as usize;
1103
1104    let mut signals = Vec::with_capacity(count);
1105    for _ in 0..count {
1106        let address = decode_string(buf)?;
1107        let sig_code = buf.get_u8();
1108        let opt_flags = buf.get_u8();
1109
1110        let datatype = if opt_flags & 0x01 != 0 {
1111            Some(decode_string(buf)?)
1112        } else {
1113            None
1114        };
1115        let access = if opt_flags & 0x02 != 0 {
1116            Some(decode_string(buf)?)
1117        } else {
1118            None
1119        };
1120
1121        let meta = if opt_flags & 0x04 != 0 {
1122            let meta_flags = buf.get_u8();
1123
1124            let unit = if meta_flags & 0x01 != 0 {
1125                Some(decode_string(buf)?)
1126            } else {
1127                None
1128            };
1129            let range = if meta_flags & 0x02 != 0 {
1130                let min = buf.get_f64();
1131                let max = buf.get_f64();
1132                Some((min, max))
1133            } else {
1134                None
1135            };
1136            let default = if meta_flags & 0x04 != 0 {
1137                let vtype = buf.get_u8();
1138                Some(decode_value_data(buf, vtype)?)
1139            } else {
1140                None
1141            };
1142            let description = if meta_flags & 0x08 != 0 {
1143                Some(decode_string(buf)?)
1144            } else {
1145                None
1146            };
1147
1148            Some(SignalMeta {
1149                unit,
1150                range,
1151                default,
1152                description,
1153            })
1154        } else {
1155            None
1156        };
1157
1158        signals.push(SignalDefinition {
1159            address,
1160            signal_type: signal_type_from_code(sig_code),
1161            datatype,
1162            access,
1163            meta,
1164        });
1165    }
1166
1167    Ok(Message::Announce(AnnounceMessage {
1168        namespace,
1169        signals,
1170        meta: None,
1171    }))
1172}
1173
1174fn decode_subscribe(buf: &mut &[u8]) -> Result<Message> {
1175    let id = buf.get_u32();
1176    let pattern = decode_string(buf)?;
1177    let type_mask = buf.get_u8();
1178
1179    let mut types = Vec::new();
1180    if type_mask == 0xFF {
1181        // All types, leave empty to indicate all
1182    } else {
1183        if type_mask & 0x01 != 0 {
1184            types.push(SignalType::Param);
1185        }
1186        if type_mask & 0x02 != 0 {
1187            types.push(SignalType::Event);
1188        }
1189        if type_mask & 0x04 != 0 {
1190            types.push(SignalType::Stream);
1191        }
1192        if type_mask & 0x08 != 0 {
1193            types.push(SignalType::Gesture);
1194        }
1195        if type_mask & 0x10 != 0 {
1196            types.push(SignalType::Timeline);
1197        }
1198    }
1199
1200    let opt_flags = buf.get_u8();
1201    let options = if opt_flags != 0 {
1202        let max_rate = if opt_flags & 0x01 != 0 {
1203            Some(buf.get_u32())
1204        } else {
1205            None
1206        };
1207        let epsilon = if opt_flags & 0x02 != 0 {
1208            Some(buf.get_f64())
1209        } else {
1210            None
1211        };
1212        let history = if opt_flags & 0x04 != 0 {
1213            Some(buf.get_u32())
1214        } else {
1215            None
1216        };
1217        let window = if opt_flags & 0x08 != 0 {
1218            Some(buf.get_u32())
1219        } else {
1220            None
1221        };
1222
1223        Some(SubscribeOptions {
1224            max_rate,
1225            epsilon,
1226            history,
1227            window,
1228        })
1229    } else {
1230        None
1231    };
1232
1233    Ok(Message::Subscribe(SubscribeMessage {
1234        id,
1235        pattern,
1236        types,
1237        options,
1238    }))
1239}
1240
1241fn decode_unsubscribe(buf: &mut &[u8]) -> Result<Message> {
1242    let id = buf.get_u32();
1243    Ok(Message::Unsubscribe(UnsubscribeMessage { id }))
1244}
1245
1246fn decode_get(buf: &mut &[u8]) -> Result<Message> {
1247    let address = decode_string(buf)?;
1248    Ok(Message::Get(GetMessage { address }))
1249}
1250
1251fn decode_snapshot(buf: &mut &[u8]) -> Result<Message> {
1252    let count = buf.get_u16() as usize;
1253    let mut params = Vec::with_capacity(count);
1254
1255    for _ in 0..count {
1256        let address = decode_string(buf)?;
1257        let vtype = buf.get_u8();
1258        let value = decode_value_data(buf, vtype)?;
1259        let revision = buf.get_u64();
1260        let opt_flags = buf.get_u8();
1261
1262        let writer = if opt_flags & 0x01 != 0 {
1263            Some(decode_string(buf)?)
1264        } else {
1265            None
1266        };
1267        let timestamp = if opt_flags & 0x02 != 0 {
1268            Some(buf.get_u64())
1269        } else {
1270            None
1271        };
1272
1273        params.push(ParamValue {
1274            address,
1275            value,
1276            revision,
1277            writer,
1278            timestamp,
1279        });
1280    }
1281
1282    Ok(Message::Snapshot(SnapshotMessage { params }))
1283}
1284
1285fn decode_replay(buf: &mut &[u8]) -> Result<Message> {
1286    let flags = buf.get_u8();
1287    let has_from = (flags & 0x80) != 0;
1288    let has_to = (flags & 0x40) != 0;
1289    let has_limit = (flags & 0x20) != 0;
1290
1291    let pattern = decode_string(buf)?;
1292
1293    let from = if has_from { Some(buf.get_u64()) } else { None };
1294    let to = if has_to { Some(buf.get_u64()) } else { None };
1295    let limit = if has_limit { Some(buf.get_u32()) } else { None };
1296
1297    let type_mask = buf.get_u8();
1298    let mut types = Vec::new();
1299    if type_mask != 0xFF {
1300        if type_mask & 0x01 != 0 {
1301            types.push(SignalType::Param);
1302        }
1303        if type_mask & 0x02 != 0 {
1304            types.push(SignalType::Event);
1305        }
1306        if type_mask & 0x04 != 0 {
1307            types.push(SignalType::Stream);
1308        }
1309        if type_mask & 0x08 != 0 {
1310            types.push(SignalType::Gesture);
1311        }
1312        if type_mask & 0x10 != 0 {
1313            types.push(SignalType::Timeline);
1314        }
1315    }
1316
1317    Ok(Message::Replay(ReplayMessage {
1318        pattern,
1319        from,
1320        to,
1321        limit,
1322        types,
1323    }))
1324}
1325
1326fn decode_federation_sync(buf: &mut &[u8]) -> Result<Message> {
1327    let op_byte = buf.get_u8();
1328    let op = match op_byte {
1329        0x01 => FederationOp::DeclareNamespaces,
1330        0x02 => FederationOp::RequestSync,
1331        0x03 => FederationOp::RevisionVector,
1332        0x04 => FederationOp::SyncComplete,
1333        _ => return Err(Error::DecodeError("unknown federation op".into())),
1334    };
1335
1336    // Decode patterns
1337    let pattern_count = buf.get_u16() as usize;
1338    let mut patterns = Vec::with_capacity(pattern_count);
1339    for _ in 0..pattern_count {
1340        patterns.push(decode_string(buf)?);
1341    }
1342
1343    // Decode revisions map
1344    let revision_count = buf.get_u16() as usize;
1345    let mut revisions = std::collections::HashMap::with_capacity(revision_count);
1346    for _ in 0..revision_count {
1347        let key = decode_string(buf)?;
1348        let val = buf.get_u64();
1349        revisions.insert(key, val);
1350    }
1351
1352    // Flags: [has_since:1][has_origin:1][rsv:6]
1353    let flags = buf.get_u8();
1354    let since_revision = if flags & 0x80 != 0 {
1355        Some(buf.get_u64())
1356    } else {
1357        None
1358    };
1359    let origin = if flags & 0x40 != 0 {
1360        Some(decode_string(buf)?)
1361    } else {
1362        None
1363    };
1364
1365    Ok(Message::FederationSync(FederationSyncMessage {
1366        op,
1367        patterns,
1368        revisions,
1369        since_revision,
1370        origin,
1371    }))
1372}
1373
1374fn decode_bundle(buf: &mut &[u8]) -> Result<Message> {
1375    let flags = buf.get_u8();
1376    let has_ts = (flags & 0x80) != 0;
1377    let count = buf.get_u16() as usize;
1378
1379    let timestamp = if has_ts { Some(buf.get_u64()) } else { None };
1380
1381    let mut messages = Vec::with_capacity(count);
1382    for _ in 0..count {
1383        let len = buf.get_u16() as usize;
1384        let inner_bytes = &buf[..len];
1385        buf.advance(len);
1386        messages.push(decode_v3_binary(inner_bytes)?);
1387    }
1388
1389    Ok(Message::Bundle(BundleMessage {
1390        timestamp,
1391        messages,
1392    }))
1393}
1394
1395fn decode_sync(buf: &mut &[u8]) -> Result<Message> {
1396    let flags = buf.get_u8();
1397    let t1 = buf.get_u64();
1398    let t2 = if flags & 0x01 != 0 {
1399        Some(buf.get_u64())
1400    } else {
1401        None
1402    };
1403    let t3 = if flags & 0x02 != 0 {
1404        Some(buf.get_u64())
1405    } else {
1406        None
1407    };
1408
1409    Ok(Message::Sync(SyncMessage { t1, t2, t3 }))
1410}
1411
1412fn decode_ack(buf: &mut &[u8]) -> Result<Message> {
1413    let flags = buf.get_u8();
1414
1415    let address = if flags & 0x01 != 0 {
1416        Some(decode_string(buf)?)
1417    } else {
1418        None
1419    };
1420    let revision = if flags & 0x02 != 0 {
1421        Some(buf.get_u64())
1422    } else {
1423        None
1424    };
1425    let locked = if flags & 0x04 != 0 {
1426        Some(buf.get_u8() != 0)
1427    } else {
1428        None
1429    };
1430    let holder = if flags & 0x08 != 0 {
1431        Some(decode_string(buf)?)
1432    } else {
1433        None
1434    };
1435    let correlation_id = if flags & 0x10 != 0 {
1436        Some(buf.get_u32())
1437    } else {
1438        None
1439    };
1440
1441    Ok(Message::Ack(AckMessage {
1442        address,
1443        revision,
1444        locked,
1445        holder,
1446        correlation_id,
1447    }))
1448}
1449
1450fn decode_error(buf: &mut &[u8]) -> Result<Message> {
1451    let code = buf.get_u16();
1452    let message = decode_string(buf)?;
1453    let flags = buf.get_u8();
1454
1455    let address = if flags & 0x01 != 0 {
1456        Some(decode_string(buf)?)
1457    } else {
1458        None
1459    };
1460    let correlation_id = if flags & 0x02 != 0 {
1461        Some(buf.get_u32())
1462    } else {
1463        None
1464    };
1465
1466    Ok(Message::Error(ErrorMessage {
1467        code,
1468        message,
1469        address,
1470        correlation_id,
1471    }))
1472}
1473
1474fn decode_query(buf: &mut &[u8]) -> Result<Message> {
1475    let pattern = decode_string(buf)?;
1476    Ok(Message::Query(QueryMessage { pattern }))
1477}
1478
1479fn decode_result(buf: &mut &[u8]) -> Result<Message> {
1480    let count = buf.get_u16() as usize;
1481    let mut signals = Vec::with_capacity(count);
1482
1483    for _ in 0..count {
1484        let address = decode_string(buf)?;
1485        let sig_code = buf.get_u8();
1486        let opt_flags = buf.get_u8();
1487
1488        let datatype = if opt_flags & 0x01 != 0 {
1489            Some(decode_string(buf)?)
1490        } else {
1491            None
1492        };
1493        let access = if opt_flags & 0x02 != 0 {
1494            Some(decode_string(buf)?)
1495        } else {
1496            None
1497        };
1498
1499        signals.push(SignalDefinition {
1500            address,
1501            signal_type: signal_type_from_code(sig_code),
1502            datatype,
1503            access,
1504            meta: None,
1505        });
1506    }
1507
1508    Ok(Message::Result(ResultMessage { signals }))
1509}
1510
1511// ============================================================================
1512// VALUE DECODING HELPERS
1513// ============================================================================
1514
1515#[inline(always)]
1516fn decode_string(buf: &mut &[u8]) -> Result<String> {
1517    if buf.remaining() < 2 {
1518        return Err(Error::BufferTooSmall {
1519            needed: 2,
1520            have: buf.remaining(),
1521        });
1522    }
1523    let len = buf.get_u16() as usize;
1524    if buf.remaining() < len {
1525        return Err(Error::BufferTooSmall {
1526            needed: len,
1527            have: buf.remaining(),
1528        });
1529    }
1530    let bytes = &buf[..len];
1531    buf.advance(len);
1532    String::from_utf8(bytes.to_vec()).map_err(|e| Error::DecodeError(e.to_string()))
1533}
1534
1535#[inline]
1536fn decode_value_data(buf: &mut &[u8], vtype: u8) -> Result<Value> {
1537    match vtype {
1538        val::NULL => Ok(Value::Null),
1539        val::BOOL => {
1540            let b = buf.get_u8();
1541            Ok(Value::Bool(b != 0))
1542        }
1543        val::I8 => {
1544            let i = buf.get_i8() as i64;
1545            Ok(Value::Int(i))
1546        }
1547        val::I16 => {
1548            let i = buf.get_i16() as i64;
1549            Ok(Value::Int(i))
1550        }
1551        val::I32 => {
1552            let i = buf.get_i32() as i64;
1553            Ok(Value::Int(i))
1554        }
1555        val::I64 => {
1556            let i = buf.get_i64();
1557            Ok(Value::Int(i))
1558        }
1559        val::F32 => {
1560            let f = buf.get_f32() as f64;
1561            Ok(Value::Float(f))
1562        }
1563        val::F64 => {
1564            let f = buf.get_f64();
1565            Ok(Value::Float(f))
1566        }
1567        val::STRING => {
1568            let s = decode_string(buf)?;
1569            Ok(Value::String(s))
1570        }
1571        val::BYTES => {
1572            if buf.remaining() < 2 {
1573                return Err(Error::BufferTooSmall {
1574                    needed: 2,
1575                    have: buf.remaining(),
1576                });
1577            }
1578            let len = buf.get_u16() as usize;
1579            if buf.remaining() < len {
1580                return Err(Error::BufferTooSmall {
1581                    needed: len,
1582                    have: buf.remaining(),
1583                });
1584            }
1585            let bytes = buf[..len].to_vec();
1586            buf.advance(len);
1587            Ok(Value::Bytes(bytes))
1588        }
1589        val::ARRAY => {
1590            let count = buf.get_u16() as usize;
1591            let mut arr = Vec::with_capacity(count);
1592            for _ in 0..count {
1593                let item_type = buf.get_u8();
1594                arr.push(decode_value_data(buf, item_type)?);
1595            }
1596            Ok(Value::Array(arr))
1597        }
1598        val::MAP => {
1599            let count = buf.get_u16() as usize;
1600            let mut map = HashMap::with_capacity(count);
1601            for _ in 0..count {
1602                let key = decode_string(buf)?;
1603                let val_type = buf.get_u8();
1604                let val = decode_value_data(buf, val_type)?;
1605                map.insert(key, val);
1606            }
1607            Ok(Value::Map(map))
1608        }
1609        _ => Err(Error::DecodeError(format!(
1610            "unknown value type: 0x{:02x}",
1611            vtype
1612        ))),
1613    }
1614}
1615
1616fn signal_type_from_code(code: u8) -> SignalType {
1617    match code {
1618        sig::PARAM => SignalType::Param,
1619        sig::EVENT => SignalType::Event,
1620        sig::STREAM => SignalType::Stream,
1621        sig::GESTURE => SignalType::Gesture,
1622        sig::TIMELINE => SignalType::Timeline,
1623        _ => SignalType::Event, // Default
1624    }
1625}
1626
1627fn gesture_phase_from_code(code: u8) -> GesturePhase {
1628    match code {
1629        phase::START => GesturePhase::Start,
1630        phase::MOVE => GesturePhase::Move,
1631        phase::END => GesturePhase::End,
1632        phase::CANCEL => GesturePhase::Cancel,
1633        _ => GesturePhase::Start, // Default
1634    }
1635}
1636
1637// ============================================================================
1638// V2 MESSAGEPACK DECODING (BACKWARD COMPATIBILITY)
1639// ============================================================================
1640
1641fn is_msgpack_map(byte: u8) -> bool {
1642    // fixmap: 0x80-0x8F, map16: 0xDE, map32: 0xDF
1643    (byte & 0xF0) == 0x80 || byte == 0xDE || byte == 0xDF
1644}
1645
1646fn decode_v2_msgpack(bytes: &[u8]) -> Result<Message> {
1647    rmp_serde::from_slice(bytes).map_err(|e| Error::DecodeError(e.to_string()))
1648}
1649
1650// ============================================================================
1651// TESTS
1652// ============================================================================
1653
1654#[cfg(test)]
1655mod tests {
1656    use super::*;
1657
1658    #[test]
1659    fn test_hello_roundtrip() {
1660        let msg = Message::Hello(HelloMessage {
1661            version: 1,
1662            name: "Test Client".to_string(),
1663            features: vec!["param".to_string(), "event".to_string()],
1664            capabilities: None,
1665            token: None,
1666        });
1667
1668        let encoded = encode(&msg).unwrap();
1669        let (decoded, frame) = decode(&encoded).unwrap();
1670
1671        match decoded {
1672            Message::Hello(hello) => {
1673                assert_eq!(hello.version, 1);
1674                assert_eq!(hello.name, "Test Client");
1675                assert!(hello.features.contains(&"param".to_string()));
1676                assert!(hello.features.contains(&"event".to_string()));
1677            }
1678            _ => panic!("Expected Hello message"),
1679        }
1680
1681        assert_eq!(frame.flags.qos, QoS::Fire);
1682        assert_eq!(frame.flags.version, 1); // binary encoding
1683    }
1684
1685    #[test]
1686    fn test_set_roundtrip() {
1687        let msg = Message::Set(SetMessage {
1688            address: "/test/value".to_string(),
1689            value: Value::Float(0.75),
1690            revision: Some(42),
1691            lock: false,
1692            unlock: false,
1693            ttl: None,
1694        });
1695
1696        let encoded = encode(&msg).unwrap();
1697        let (decoded, frame) = decode(&encoded).unwrap();
1698
1699        match decoded {
1700            Message::Set(set) => {
1701                assert_eq!(set.address, "/test/value");
1702                assert_eq!(set.value.as_f64(), Some(0.75));
1703                assert_eq!(set.revision, Some(42));
1704            }
1705            _ => panic!("Expected Set message"),
1706        }
1707
1708        assert_eq!(frame.flags.qos, QoS::Confirm);
1709    }
1710
1711    #[test]
1712    fn test_set_size_reduction() {
1713        let msg = Message::Set(SetMessage {
1714            address: "/test/value".to_string(),
1715            value: Value::Float(0.5),
1716            revision: Some(1),
1717            lock: false,
1718            unlock: false,
1719            ttl: None,
1720        });
1721
1722        // Binary encoding
1723        let binary_payload = encode_message(&msg).unwrap();
1724
1725        // MessagePack encoding (named keys, legacy)
1726        let msgpack_payload = rmp_serde::to_vec_named(&msg).unwrap();
1727
1728        println!("Binary payload: {} bytes", binary_payload.len());
1729        println!("MessagePack payload: {} bytes", msgpack_payload.len());
1730
1731        // Binary encoding should be significantly smaller (target: ~32 bytes vs ~69 bytes)
1732        assert!(
1733            binary_payload.len() < msgpack_payload.len(),
1734            "Binary encoding ({}) should be smaller than MessagePack ({})",
1735            binary_payload.len(),
1736            msgpack_payload.len()
1737        );
1738
1739        // Binary encoding should be at least 40% smaller
1740        let savings = 100 - (binary_payload.len() * 100 / msgpack_payload.len());
1741        println!("Size reduction: {}%", savings);
1742        assert!(
1743            savings >= 40,
1744            "Expected at least 40% size reduction, got {}%",
1745            savings
1746        );
1747    }
1748
1749    #[test]
1750    fn test_bundle_roundtrip() {
1751        let msg = Message::Bundle(BundleMessage {
1752            timestamp: Some(1000000),
1753            messages: vec![
1754                Message::Set(SetMessage {
1755                    address: "/light/1".to_string(),
1756                    value: Value::Float(1.0),
1757                    revision: None,
1758                    lock: false,
1759                    unlock: false,
1760                    ttl: None,
1761                }),
1762                Message::Set(SetMessage {
1763                    address: "/light/2".to_string(),
1764                    value: Value::Float(0.0),
1765                    revision: None,
1766                    lock: false,
1767                    unlock: false,
1768                    ttl: None,
1769                }),
1770            ],
1771        });
1772
1773        let encoded = encode(&msg).unwrap();
1774        let (decoded, _) = decode(&encoded).unwrap();
1775
1776        match decoded {
1777            Message::Bundle(bundle) => {
1778                assert_eq!(bundle.timestamp, Some(1000000));
1779                assert_eq!(bundle.messages.len(), 2);
1780            }
1781            _ => panic!("Expected Bundle message"),
1782        }
1783    }
1784
1785    #[test]
1786    fn test_value_types() {
1787        let values = vec![
1788            Value::Null,
1789            Value::Bool(true),
1790            Value::Int(42),
1791            Value::Float(1.25),
1792            Value::String("hello".to_string()),
1793            Value::Array(vec![Value::Int(1), Value::Int(2), Value::Int(3)]),
1794        ];
1795
1796        for value in values {
1797            let msg = Message::Set(SetMessage {
1798                address: "/test".to_string(),
1799                value: value.clone(),
1800                revision: None,
1801                lock: false,
1802                unlock: false,
1803                ttl: None,
1804            });
1805
1806            let encoded = encode(&msg).unwrap();
1807            let (decoded, _) = decode(&encoded).unwrap();
1808
1809            match decoded {
1810                Message::Set(set) => {
1811                    assert_eq!(set.value, value);
1812                }
1813                _ => panic!("Expected Set message"),
1814            }
1815        }
1816    }
1817
1818    #[test]
1819    fn test_backward_compat_v2_decode() {
1820        // Create a v2 MessagePack encoded message
1821        let msg = SetMessage {
1822            address: "/test/value".to_string(),
1823            value: Value::Float(0.5),
1824            revision: Some(1),
1825            lock: false,
1826            unlock: false,
1827            ttl: None,
1828        };
1829
1830        // Encode as v2 (MessagePack with named keys)
1831        let v2_bytes = rmp_serde::to_vec_named(&Message::Set(msg.clone())).unwrap();
1832
1833        // Should still decode correctly
1834        let decoded = decode_message(&v2_bytes).unwrap();
1835
1836        match decoded {
1837            Message::Set(set) => {
1838                assert_eq!(set.address, "/test/value");
1839                assert_eq!(set.value.as_f64(), Some(0.5));
1840            }
1841            _ => panic!("Expected Set message"),
1842        }
1843    }
1844
1845    #[test]
1846    fn test_ping_pong() {
1847        let ping = encode(&Message::Ping).unwrap();
1848        let (decoded, _) = decode(&ping).unwrap();
1849        assert!(matches!(decoded, Message::Ping));
1850
1851        let pong = encode(&Message::Pong).unwrap();
1852        let (decoded, _) = decode(&pong).unwrap();
1853        assert!(matches!(decoded, Message::Pong));
1854    }
1855
1856    #[test]
1857    fn test_publish_event() {
1858        let msg = Message::Publish(PublishMessage {
1859            address: "/cue/fire".to_string(),
1860            signal: Some(SignalType::Event),
1861            value: None,
1862            payload: Some(Value::String("intro".to_string())),
1863            samples: None,
1864            rate: None,
1865            id: None,
1866            phase: None,
1867            timestamp: Some(1234567890),
1868            timeline: None,
1869        });
1870
1871        let encoded = encode(&msg).unwrap();
1872        let (decoded, _) = decode(&encoded).unwrap();
1873
1874        match decoded {
1875            Message::Publish(pub_msg) => {
1876                assert_eq!(pub_msg.address, "/cue/fire");
1877                assert_eq!(pub_msg.signal, Some(SignalType::Event));
1878                assert_eq!(pub_msg.timestamp, Some(1234567890));
1879            }
1880            _ => panic!("Expected Publish message"),
1881        }
1882    }
1883
1884    #[test]
1885    fn test_subscribe_roundtrip() {
1886        let msg = Message::Subscribe(SubscribeMessage {
1887            id: 42,
1888            pattern: "/lumen/scene/*/layer/**".to_string(),
1889            types: vec![SignalType::Param, SignalType::Stream],
1890            options: Some(SubscribeOptions {
1891                max_rate: Some(60),
1892                epsilon: Some(0.01),
1893                history: None,
1894                window: None,
1895            }),
1896        });
1897
1898        let encoded = encode(&msg).unwrap();
1899        let (decoded, _) = decode(&encoded).unwrap();
1900
1901        match decoded {
1902            Message::Subscribe(sub) => {
1903                assert_eq!(sub.id, 42);
1904                assert_eq!(sub.pattern, "/lumen/scene/*/layer/**");
1905                assert!(sub.types.contains(&SignalType::Param));
1906                assert!(sub.types.contains(&SignalType::Stream));
1907                assert_eq!(sub.options.as_ref().unwrap().max_rate, Some(60));
1908            }
1909            _ => panic!("Expected Subscribe message"),
1910        }
1911    }
1912}