1#![no_std]
28#![allow(dead_code)]
29
30#[cfg(feature = "alloc")]
31extern crate alloc;
32
33#[cfg(feature = "alloc")]
34use alloc::{string::String, vec::Vec};
35
36pub const MAGIC: u8 = 0x53; pub const VERSION: u8 = 1;
45
46pub mod msg {
48 pub const HELLO: u8 = 0x01;
49 pub const WELCOME: u8 = 0x02;
50 pub const SUBSCRIBE: u8 = 0x10;
51 pub const UNSUBSCRIBE: u8 = 0x11;
52 pub const PUBLISH: u8 = 0x20;
53 pub const SET: u8 = 0x21;
54 pub const GET: u8 = 0x22;
55 pub const SNAPSHOT: u8 = 0x23;
56 pub const PING: u8 = 0x41;
57 pub const PONG: u8 = 0x42;
58 pub const ACK: u8 = 0x50;
59 pub const ERROR: u8 = 0x51;
60}
61
62pub mod val {
64 pub const NULL: u8 = 0x00;
65 pub const BOOL: u8 = 0x01;
66 pub const I32: u8 = 0x04;
67 pub const I64: u8 = 0x05;
68 pub const F32: u8 = 0x06;
69 pub const F64: u8 = 0x07;
70 pub const STRING: u8 = 0x08;
71 pub const BYTES: u8 = 0x09;
72}
73
74pub const HEADER_SIZE: usize = 4;
80
81pub const MAX_PAYLOAD: usize = 1024;
83
84pub fn decode_header(buf: &[u8]) -> Option<(u8, usize)> {
86 if buf.len() < HEADER_SIZE || buf[0] != MAGIC {
87 return None;
88 }
89 let flags = buf[1];
90 let len = u16::from_be_bytes([buf[2], buf[3]]) as usize;
91 Some((flags, len))
92}
93
94pub const FLAGS_BINARY: u8 = 0x01; pub fn encode_header(buf: &mut [u8], _flags: u8, payload_len: usize) -> usize {
100 if buf.len() < HEADER_SIZE {
101 return 0;
102 }
103 buf[0] = MAGIC;
104 buf[1] = FLAGS_BINARY; let len = (payload_len as u16).to_be_bytes();
106 buf[2] = len[0];
107 buf[3] = len[1];
108 HEADER_SIZE
109}
110
111#[derive(Clone, Copy, Debug, PartialEq)]
117pub enum Value {
118 Null,
119 Bool(bool),
120 Int(i64),
121 Float(f64),
122}
123
124impl Value {
125 pub fn as_int(&self) -> Option<i64> {
126 match self {
127 Value::Int(i) => Some(*i),
128 Value::Float(f) => Some(*f as i64),
129 _ => None,
130 }
131 }
132
133 pub fn as_float(&self) -> Option<f64> {
134 match self {
135 Value::Float(f) => Some(*f),
136 Value::Int(i) => Some(*i as f64),
137 _ => None,
138 }
139 }
140
141 pub fn as_bool(&self) -> Option<bool> {
142 match self {
143 Value::Bool(b) => Some(*b),
144 _ => None,
145 }
146 }
147}
148
149pub fn encode_value(buf: &mut [u8], value: &Value) -> usize {
151 match value {
152 Value::Null => {
153 if buf.is_empty() {
154 return 0;
155 }
156 buf[0] = val::NULL;
157 1
158 }
159 Value::Bool(b) => {
160 if buf.len() < 2 {
161 return 0;
162 }
163 buf[0] = val::BOOL;
164 buf[1] = if *b { 1 } else { 0 };
165 2
166 }
167 Value::Int(i) => {
168 if buf.len() < 9 {
169 return 0;
170 }
171 buf[0] = val::I64;
172 buf[1..9].copy_from_slice(&i.to_be_bytes());
173 9
174 }
175 Value::Float(f) => {
176 if buf.len() < 9 {
177 return 0;
178 }
179 buf[0] = val::F64;
180 buf[1..9].copy_from_slice(&f.to_be_bytes());
181 9
182 }
183 }
184}
185
186pub fn decode_value(buf: &[u8]) -> Option<(Value, usize)> {
188 if buf.is_empty() {
189 return None;
190 }
191 match buf[0] {
192 val::NULL => Some((Value::Null, 1)),
193 val::BOOL => {
194 if buf.len() < 2 {
195 return None;
196 }
197 Some((Value::Bool(buf[1] != 0), 2))
198 }
199 val::I32 => {
200 if buf.len() < 5 {
201 return None;
202 }
203 let i = i32::from_be_bytes([buf[1], buf[2], buf[3], buf[4]]);
204 Some((Value::Int(i as i64), 5))
205 }
206 val::I64 => {
207 if buf.len() < 9 {
208 return None;
209 }
210 let i = i64::from_be_bytes([
211 buf[1], buf[2], buf[3], buf[4], buf[5], buf[6], buf[7], buf[8],
212 ]);
213 Some((Value::Int(i), 9))
214 }
215 val::F32 => {
216 if buf.len() < 5 {
217 return None;
218 }
219 let f = f32::from_be_bytes([buf[1], buf[2], buf[3], buf[4]]);
220 Some((Value::Float(f as f64), 5))
221 }
222 val::F64 => {
223 if buf.len() < 9 {
224 return None;
225 }
226 let f = f64::from_be_bytes([
227 buf[1], buf[2], buf[3], buf[4], buf[5], buf[6], buf[7], buf[8],
228 ]);
229 Some((Value::Float(f), 9))
230 }
231 _ => None,
232 }
233}
234
235pub fn encode_string(buf: &mut [u8], s: &str) -> usize {
241 let bytes = s.as_bytes();
242 if buf.len() < 2 + bytes.len() {
243 return 0;
244 }
245 let len = (bytes.len() as u16).to_be_bytes();
246 buf[0] = len[0];
247 buf[1] = len[1];
248 buf[2..2 + bytes.len()].copy_from_slice(bytes);
249 2 + bytes.len()
250}
251
252pub fn decode_string(buf: &[u8]) -> Option<(&str, usize)> {
254 if buf.len() < 2 {
255 return None;
256 }
257 let len = u16::from_be_bytes([buf[0], buf[1]]) as usize;
258 if buf.len() < 2 + len {
259 return None;
260 }
261 let s = core::str::from_utf8(&buf[2..2 + len]).ok()?;
262 Some((s, 2 + len))
263}
264
265fn value_type_code(value: &Value) -> u8 {
271 match value {
272 Value::Null => val::NULL,
273 Value::Bool(_) => val::BOOL,
274 Value::Int(_) => val::I64,
275 Value::Float(_) => val::F64,
276 }
277}
278
279fn encode_value_data(buf: &mut [u8], value: &Value) -> usize {
281 match value {
282 Value::Null => 0,
283 Value::Bool(b) => {
284 if buf.is_empty() {
285 return 0;
286 }
287 buf[0] = if *b { 1 } else { 0 };
288 1
289 }
290 Value::Int(i) => {
291 if buf.len() < 8 {
292 return 0;
293 }
294 buf[..8].copy_from_slice(&i.to_be_bytes());
295 8
296 }
297 Value::Float(f) => {
298 if buf.len() < 8 {
299 return 0;
300 }
301 buf[..8].copy_from_slice(&f.to_be_bytes());
302 8
303 }
304 }
305}
306
307pub fn encode_set(buf: &mut [u8], address: &str, value: &Value) -> usize {
311 if buf.len() < 2 {
312 return 0;
313 }
314
315 buf[0] = msg::SET;
317
318 let vtype = value_type_code(value);
320 buf[1] = vtype & 0x0F;
321
322 let mut offset = 2;
323
324 offset += encode_string(&mut buf[offset..], address);
326
327 offset += encode_value_data(&mut buf[offset..], value);
329
330 offset
331}
332
333pub fn encode_set_frame(buf: &mut [u8], address: &str, value: &Value) -> usize {
335 let header_size = HEADER_SIZE;
336 let payload_start = header_size;
337
338 let payload_len = encode_set(&mut buf[payload_start..], address, value);
339 if payload_len == 0 {
340 return 0;
341 }
342
343 encode_header(buf, 0, payload_len);
344 header_size + payload_len
345}
346
347pub fn encode_subscribe(buf: &mut [u8], pattern: &str) -> usize {
349 if buf.is_empty() {
350 return 0;
351 }
352 buf[0] = msg::SUBSCRIBE;
353 let mut offset = 1;
354
355 if buf.len() < offset + 4 {
357 return 0;
358 }
359 buf[offset..offset + 4].copy_from_slice(&0u32.to_be_bytes());
360 offset += 4;
361
362 offset += encode_string(&mut buf[offset..], pattern);
364
365 if buf.len() > offset {
367 buf[offset] = 0;
368 offset += 1;
369 }
370
371 offset
372}
373
374pub fn encode_subscribe_frame(buf: &mut [u8], pattern: &str) -> usize {
376 let header_size = HEADER_SIZE;
377 let payload_len = encode_subscribe(&mut buf[header_size..], pattern);
378 if payload_len == 0 {
379 return 0;
380 }
381 encode_header(buf, 0, payload_len);
382 header_size + payload_len
383}
384
385pub fn encode_hello(buf: &mut [u8], name: &str) -> usize {
388 if buf.len() < 6 {
389 return 0;
390 }
391
392 buf[0] = msg::HELLO;
394
395 buf[1] = VERSION;
397
398 buf[2] = 0xF8; let mut offset = 3;
402
403 offset += encode_string(&mut buf[offset..], name);
405
406 if buf.len() >= offset + 2 {
408 buf[offset] = 0;
409 buf[offset + 1] = 0;
410 offset += 2;
411 }
412
413 offset
414}
415
416pub fn encode_hello_frame(buf: &mut [u8], name: &str) -> usize {
418 let header_size = HEADER_SIZE;
419 let payload_len = encode_hello(&mut buf[header_size..], name);
420 if payload_len == 0 {
421 return 0;
422 }
423 encode_header(buf, 0, payload_len);
424 header_size + payload_len
425}
426
427pub fn encode_ping_frame(buf: &mut [u8]) -> usize {
429 if buf.len() < HEADER_SIZE + 1 {
430 return 0;
431 }
432 encode_header(buf, 0, 1);
433 buf[HEADER_SIZE] = msg::PING;
434 HEADER_SIZE + 1
435}
436
437pub fn encode_pong_frame(buf: &mut [u8]) -> usize {
439 if buf.len() < HEADER_SIZE + 1 {
440 return 0;
441 }
442 encode_header(buf, 0, 1);
443 buf[HEADER_SIZE] = msg::PONG;
444 HEADER_SIZE + 1
445}
446
447#[derive(Debug)]
453pub enum Message<'a> {
454 Hello { name: &'a str, version: u8 },
455 Welcome { session: &'a str },
456 Set { address: &'a str, value: Value },
457 Subscribe { id: u32, pattern: &'a str },
458 Unsubscribe { id: u32 },
459 Ping,
460 Pong,
461 Error { code: u16, message: &'a str },
462 Unknown(u8),
463}
464
465pub fn decode_message(payload: &[u8]) -> Option<Message<'_>> {
467 if payload.is_empty() {
468 return None;
469 }
470
471 let msg_type = payload[0];
472 let data = &payload[1..];
473
474 match msg_type {
475 msg::HELLO => {
476 if data.len() < 2 {
478 return None;
479 }
480 let version = data[0];
481 let _features = data[1];
482 let (name, _) = decode_string(&data[2..])?;
483 Some(Message::Hello { name, version })
484 }
485 msg::WELCOME => {
486 if data.len() < 10 {
488 return None;
489 }
490 let _version = data[0];
491 let _features = data[1];
492 let _time = u64::from_be_bytes([
493 data[2], data[3], data[4], data[5], data[6], data[7], data[8], data[9],
494 ]);
495 let (session, _) = decode_string(&data[10..])?;
496 Some(Message::Welcome { session })
497 }
498 msg::SET => {
499 if data.is_empty() {
502 return None;
503 }
504 let flags = data[0];
505 let vtype = flags & 0x0F;
506 let _has_rev = (flags & 0x80) != 0;
507
508 let (address, offset) = decode_string(&data[1..])?;
509 let value_data = &data[1 + offset..];
510
511 let value = match vtype {
512 val::NULL => Value::Null,
513 val::BOOL => {
514 if value_data.is_empty() {
515 return None;
516 }
517 Value::Bool(value_data[0] != 0)
518 }
519 val::I64 => {
520 if value_data.len() < 8 {
521 return None;
522 }
523 let i = i64::from_be_bytes([
524 value_data[0],
525 value_data[1],
526 value_data[2],
527 value_data[3],
528 value_data[4],
529 value_data[5],
530 value_data[6],
531 value_data[7],
532 ]);
533 Value::Int(i)
534 }
535 val::F64 => {
536 if value_data.len() < 8 {
537 return None;
538 }
539 let f = f64::from_be_bytes([
540 value_data[0],
541 value_data[1],
542 value_data[2],
543 value_data[3],
544 value_data[4],
545 value_data[5],
546 value_data[6],
547 value_data[7],
548 ]);
549 Value::Float(f)
550 }
551 _ => return None, };
553
554 Some(Message::Set { address, value })
555 }
556 msg::SUBSCRIBE => {
557 if data.len() < 4 {
559 return None;
560 }
561 let id = u32::from_be_bytes([data[0], data[1], data[2], data[3]]);
562 let (pattern, _) = decode_string(&data[4..])?;
563 Some(Message::Subscribe { id, pattern })
564 }
565 msg::UNSUBSCRIBE => {
566 if data.len() < 4 {
568 return None;
569 }
570 let id = u32::from_be_bytes([data[0], data[1], data[2], data[3]]);
571 Some(Message::Unsubscribe { id })
572 }
573 msg::PING => Some(Message::Ping),
574 msg::PONG => Some(Message::Pong),
575 msg::ERROR => {
576 if data.len() < 2 {
577 return None;
578 }
579 let code = u16::from_be_bytes([data[0], data[1]]);
580 let (message, _) = decode_string(&data[2..]).unwrap_or(("", 0));
581 Some(Message::Error { code, message })
582 }
583 _ => Some(Message::Unknown(msg_type)),
584 }
585}
586
587pub const MAX_CACHE_ENTRIES: usize = 32;
593
594pub const MAX_ADDRESS_LEN: usize = 64;
596
597#[derive(Clone)]
599pub struct CacheEntry {
600 address: [u8; MAX_ADDRESS_LEN],
601 address_len: u8,
602 value: Value,
603 valid: bool,
604}
605
606impl Default for CacheEntry {
607 fn default() -> Self {
608 Self {
609 address: [0; MAX_ADDRESS_LEN],
610 address_len: 0,
611 value: Value::Null,
612 valid: false,
613 }
614 }
615}
616
617impl CacheEntry {
618 fn address(&self) -> &str {
619 core::str::from_utf8(&self.address[..self.address_len as usize]).unwrap_or("")
620 }
621
622 fn set_address(&mut self, addr: &str) {
623 let bytes = addr.as_bytes();
624 let len = bytes.len().min(MAX_ADDRESS_LEN);
625 self.address[..len].copy_from_slice(&bytes[..len]);
626 self.address_len = len as u8;
627 }
628}
629
630pub struct StateCache {
632 entries: [CacheEntry; MAX_CACHE_ENTRIES],
633 count: usize,
634}
635
636impl StateCache {
637 pub const fn new() -> Self {
638 Self {
639 entries: [const {
640 CacheEntry {
641 address: [0; MAX_ADDRESS_LEN],
642 address_len: 0,
643 value: Value::Null,
644 valid: false,
645 }
646 }; MAX_CACHE_ENTRIES],
647 count: 0,
648 }
649 }
650
651 pub fn get(&self, address: &str) -> Option<Value> {
653 for entry in &self.entries[..self.count] {
654 if entry.valid && entry.address() == address {
655 return Some(entry.value);
656 }
657 }
658 None
659 }
660
661 pub fn set(&mut self, address: &str, value: Value) -> bool {
663 for entry in &mut self.entries[..self.count] {
665 if entry.valid && entry.address() == address {
666 entry.value = value;
667 return true;
668 }
669 }
670
671 if self.count < MAX_CACHE_ENTRIES {
673 self.entries[self.count].set_address(address);
674 self.entries[self.count].value = value;
675 self.entries[self.count].valid = true;
676 self.count += 1;
677 return true;
678 }
679
680 false
681 }
682
683 pub fn len(&self) -> usize {
684 self.count
685 }
686
687 pub fn is_empty(&self) -> bool {
688 self.count == 0
689 }
690
691 pub fn clear(&mut self) {
692 for entry in &mut self.entries {
693 entry.valid = false;
694 }
695 self.count = 0;
696 }
697}
698
699impl Default for StateCache {
700 fn default() -> Self {
701 Self::new()
702 }
703}
704
705#[derive(Debug, Clone, Copy, PartialEq, Eq)]
711pub enum ClientState {
712 Disconnected,
713 Connecting,
714 Connected,
715}
716
717pub const TX_BUF_SIZE: usize = 256;
719pub const RX_BUF_SIZE: usize = 512;
720
721pub struct Client {
726 pub state: ClientState,
727 pub cache: StateCache,
728 tx_buf: [u8; TX_BUF_SIZE],
729 rx_buf: [u8; RX_BUF_SIZE],
730}
731
732impl Client {
733 pub const fn new() -> Self {
734 Self {
735 state: ClientState::Disconnected,
736 cache: StateCache::new(),
737 tx_buf: [0; TX_BUF_SIZE],
738 rx_buf: [0; RX_BUF_SIZE],
739 }
740 }
741
742 pub fn prepare_hello(&mut self, name: &str) -> &[u8] {
744 let n = encode_hello_frame(&mut self.tx_buf, name);
745 &self.tx_buf[..n]
746 }
747
748 pub fn prepare_set(&mut self, address: &str, value: Value) -> &[u8] {
750 let n = encode_set_frame(&mut self.tx_buf, address, &value);
751 &self.tx_buf[..n]
752 }
753
754 pub fn prepare_subscribe(&mut self, pattern: &str) -> &[u8] {
756 let n = encode_subscribe_frame(&mut self.tx_buf, pattern);
757 &self.tx_buf[..n]
758 }
759
760 pub fn prepare_ping(&mut self) -> &[u8] {
762 let n = encode_ping_frame(&mut self.tx_buf);
763 &self.tx_buf[..n]
764 }
765
766 pub fn process<'a>(&mut self, data: &'a [u8]) -> Option<Message<'a>> {
768 let (_, payload_len) = decode_header(data)?;
769 let payload = &data[HEADER_SIZE..HEADER_SIZE + payload_len];
770 let msg = decode_message(payload)?;
771
772 match &msg {
773 Message::Welcome { .. } => {
774 self.state = ClientState::Connected;
775 }
776 Message::Set { address, value } => {
777 self.cache.set(address, *value);
778 }
779 _ => {}
780 }
781
782 Some(msg)
783 }
784
785 pub fn is_connected(&self) -> bool {
786 self.state == ClientState::Connected
787 }
788
789 pub fn get_cached(&self, address: &str) -> Option<Value> {
790 self.cache.get(address)
791 }
792}
793
794impl Default for Client {
795 fn default() -> Self {
796 Self::new()
797 }
798}
799
800#[cfg(feature = "server")]
805pub mod server {
806 use super::*;
807
808 pub const MAX_CLIENTS: usize = 4;
810
811 pub const MAX_SUBS_PER_CLIENT: usize = 8;
813
814 pub const MAX_PATTERN_LEN: usize = 64;
816
817 #[derive(Clone)]
819 pub struct Subscription {
820 pub active: bool,
821 pub id: u32,
822 pub pattern: [u8; MAX_PATTERN_LEN],
823 pub pattern_len: usize,
824 }
825
826 impl Subscription {
827 pub const fn empty() -> Self {
828 Self {
829 active: false,
830 id: 0,
831 pattern: [0; MAX_PATTERN_LEN],
832 pattern_len: 0,
833 }
834 }
835
836 pub fn matches(&self, address: &str) -> bool {
838 if !self.active || self.pattern_len == 0 {
839 return false;
840 }
841
842 let pattern = match core::str::from_utf8(&self.pattern[..self.pattern_len]) {
844 Ok(s) => s,
845 Err(_) => return false,
846 };
847
848 Self::match_pattern(pattern, address)
851 }
852
853 fn match_pattern(pattern: &str, address: &str) -> bool {
854 let mut pattern_iter = pattern.split('/').filter(|s| !s.is_empty());
857 let mut address_iter = address.split('/').filter(|s| !s.is_empty());
858
859 loop {
860 match (pattern_iter.next(), address_iter.next()) {
861 (None, None) => return true,
862 (Some("**"), _) => {
863 if let Some(next_pattern) = pattern_iter.next() {
866 loop {
868 match address_iter.next() {
869 None => return next_pattern == "**",
870 Some(seg) if seg == next_pattern || next_pattern == "*" => {
871 break;
873 }
874 Some(_) => continue,
875 }
876 }
877 } else {
878 return true;
880 }
881 }
882 (Some("*"), Some(_)) => continue,
883 (Some(p), Some(a)) if p == a => continue,
884 (None, Some(_)) => return false,
885 (Some(_), None) => {
886 return pattern_iter.all(|p| p == "**");
888 }
889 _ => return false,
890 }
891 }
892 }
893 }
894
895 pub struct Session {
897 pub active: bool,
898 pub id: u8,
899 pub subscriptions: [Subscription; MAX_SUBS_PER_CLIENT],
900 pub sub_count: u8,
901 }
902
903 impl Session {
904 pub const fn new() -> Self {
905 Self {
906 active: false,
907 id: 0,
908 subscriptions: [const { Subscription::empty() }; MAX_SUBS_PER_CLIENT],
909 sub_count: 0,
910 }
911 }
912
913 pub fn subscribe(&mut self, id: u32, pattern: &str) -> bool {
915 if self.sub_count as usize >= MAX_SUBS_PER_CLIENT {
916 return false;
917 }
918 if pattern.len() > MAX_PATTERN_LEN {
919 return false;
920 }
921
922 for sub in &mut self.subscriptions {
924 if !sub.active {
925 sub.active = true;
926 sub.id = id;
927 sub.pattern[..pattern.len()].copy_from_slice(pattern.as_bytes());
928 sub.pattern_len = pattern.len();
929 self.sub_count += 1;
930 return true;
931 }
932 }
933 false
934 }
935
936 pub fn unsubscribe(&mut self, id: u32) -> bool {
938 for sub in &mut self.subscriptions {
939 if sub.active && sub.id == id {
940 sub.active = false;
941 sub.pattern_len = 0;
942 self.sub_count = self.sub_count.saturating_sub(1);
943 return true;
944 }
945 }
946 false
947 }
948
949 pub fn has_match(&self, address: &str) -> bool {
951 self.subscriptions.iter().any(|s| s.matches(address))
952 }
953 }
954
955 pub struct BroadcastList {
957 pub clients: [bool; MAX_CLIENTS],
958 pub count: u8,
959 }
960
961 impl BroadcastList {
962 pub const fn empty() -> Self {
963 Self {
964 clients: [false; MAX_CLIENTS],
965 count: 0,
966 }
967 }
968 }
969
970 pub struct MiniRouter {
974 pub state: StateCache,
975 sessions: [Session; MAX_CLIENTS],
976 session_count: u8,
977 tx_buf: [u8; TX_BUF_SIZE],
978 }
979
980 impl MiniRouter {
981 pub const fn new() -> Self {
982 Self {
983 state: StateCache::new(),
984 sessions: [const { Session::new() }; MAX_CLIENTS],
985 session_count: 0,
986 tx_buf: [0; TX_BUF_SIZE],
987 }
988 }
989
990 pub fn process(&mut self, client_id: u8, data: &[u8]) -> Option<&[u8]> {
994 let (_, payload_len) = decode_header(data)?;
995 let payload = &data[HEADER_SIZE..HEADER_SIZE + payload_len];
996 let msg = decode_message(payload)?;
997
998 match msg {
999 Message::Hello { name, .. } => {
1000 self.create_session(client_id);
1001 Some(self.prepare_welcome(client_id))
1002 }
1003 Message::Subscribe { id, pattern } => {
1004 self.handle_subscribe(client_id, id, pattern);
1005 None }
1007 Message::Unsubscribe { id } => {
1008 self.handle_unsubscribe(client_id, id);
1009 None
1010 }
1011 Message::Set { address, value } => {
1012 self.state.set(address, value);
1013 None }
1015 Message::Ping => Some(self.prepare_pong()),
1016 _ => None,
1017 }
1018 }
1019
1020 pub fn get_broadcast_targets(&self, address: &str, sender_id: u8) -> BroadcastList {
1024 let mut result = BroadcastList::empty();
1025
1026 for (i, session) in self.sessions.iter().enumerate() {
1027 if session.active && i as u8 != sender_id && session.has_match(address) {
1029 result.clients[i] = true;
1030 result.count += 1;
1031 }
1032 }
1033
1034 result
1035 }
1036
1037 pub fn prepare_broadcast(&mut self, address: &str, value: Value) -> &[u8] {
1041 let n = encode_set_frame(&mut self.tx_buf, address, &value);
1042 &self.tx_buf[..n]
1043 }
1044
1045 fn handle_subscribe(&mut self, client_id: u8, id: u32, pattern: &str) {
1046 if let Some(session) = self.sessions.get_mut(client_id as usize) {
1047 if session.active {
1048 session.subscribe(id, pattern);
1049 }
1050 }
1051 }
1052
1053 fn handle_unsubscribe(&mut self, client_id: u8, id: u32) {
1054 if let Some(session) = self.sessions.get_mut(client_id as usize) {
1055 if session.active {
1056 session.unsubscribe(id);
1057 }
1058 }
1059 }
1060
1061 fn create_session(&mut self, client_id: u8) {
1062 if (client_id as usize) < MAX_CLIENTS {
1063 self.sessions[client_id as usize] = Session {
1064 active: true,
1065 id: client_id,
1066 subscriptions: [const { Subscription::empty() }; MAX_SUBS_PER_CLIENT],
1067 sub_count: 0,
1068 };
1069 self.session_count += 1;
1070 }
1071 }
1072
1073 pub fn disconnect(&mut self, client_id: u8) {
1075 if let Some(session) = self.sessions.get_mut(client_id as usize) {
1076 if session.active {
1077 session.active = false;
1078 session.sub_count = 0;
1079 self.session_count = self.session_count.saturating_sub(1);
1080 }
1081 }
1082 }
1083
1084 fn prepare_welcome(&mut self, _client_id: u8) -> &[u8] {
1085 let payload_start = HEADER_SIZE;
1086 let mut offset = payload_start;
1087
1088 self.tx_buf[offset] = msg::WELCOME;
1089 offset += 1;
1090
1091 self.tx_buf[offset] = VERSION;
1092 offset += 1;
1093
1094 self.tx_buf[offset] = 0xF8; offset += 1;
1096
1097 self.tx_buf[offset..offset + 8].copy_from_slice(&0u64.to_be_bytes());
1098 offset += 8;
1099
1100 offset += encode_string(&mut self.tx_buf[offset..], "embedded");
1101 offset += encode_string(&mut self.tx_buf[offset..], "MiniRouter");
1102
1103 let payload_len = offset - payload_start;
1104 encode_header(&mut self.tx_buf, 0, payload_len);
1105
1106 &self.tx_buf[..offset]
1107 }
1108
1109 fn prepare_pong(&mut self) -> &[u8] {
1110 let n = encode_pong_frame(&mut self.tx_buf);
1111 &self.tx_buf[..n]
1112 }
1113
1114 pub fn get(&self, address: &str) -> Option<Value> {
1115 self.state.get(address)
1116 }
1117
1118 pub fn set(&mut self, address: &str, value: Value) {
1119 self.state.set(address, value);
1120 }
1121
1122 pub fn session_count(&self) -> u8 {
1124 self.session_count
1125 }
1126
1127 pub fn session_mut(&mut self, client_id: u8) -> Option<&mut Session> {
1129 self.sessions.get_mut(client_id as usize)
1130 }
1131 }
1132
1133 impl Default for MiniRouter {
1134 fn default() -> Self {
1135 Self::new()
1136 }
1137 }
1138}
1139
1140#[cfg(test)]
1145mod tests {
1146 use super::*;
1147
1148 #[test]
1149 fn test_encode_decode_value() {
1150 let mut buf = [0u8; 16];
1151
1152 let n = encode_value(&mut buf, &Value::Float(3.14));
1154 assert_eq!(n, 9);
1155 let (v, consumed) = decode_value(&buf).unwrap();
1156 assert_eq!(consumed, 9);
1157 assert!((v.as_float().unwrap() - 3.14).abs() < 0.001);
1158
1159 let n = encode_value(&mut buf, &Value::Int(-42));
1161 let (v, _) = decode_value(&buf).unwrap();
1162 assert_eq!(v.as_int(), Some(-42));
1163 }
1164
1165 #[test]
1166 fn test_encode_decode_set() {
1167 let mut buf = [0u8; 64];
1168 let n = encode_set_frame(&mut buf, "/test/value", &Value::Float(1.5));
1169 assert!(n > HEADER_SIZE);
1170
1171 let (_, payload_len) = decode_header(&buf).unwrap();
1172 let payload = &buf[HEADER_SIZE..HEADER_SIZE + payload_len];
1173 let msg = decode_message(payload).unwrap();
1174
1175 match msg {
1176 Message::Set { address, value } => {
1177 assert_eq!(address, "/test/value");
1178 assert!((value.as_float().unwrap() - 1.5).abs() < 0.001);
1179 }
1180 _ => panic!("Expected Set message"),
1181 }
1182 }
1183
1184 #[test]
1185 fn test_client_flow() {
1186 let mut client = Client::new();
1187 assert_eq!(client.state, ClientState::Disconnected);
1188
1189 let hello = client.prepare_hello("ESP32");
1191 assert!(hello.len() > HEADER_SIZE);
1192
1193 let mut welcome_buf = [0u8; 64];
1195 let payload_start = HEADER_SIZE;
1196 let mut offset = payload_start;
1197
1198 welcome_buf[offset] = msg::WELCOME;
1200 offset += 1;
1201
1202 welcome_buf[offset] = VERSION;
1204 offset += 1;
1205
1206 welcome_buf[offset] = 0xF8;
1208 offset += 1;
1209
1210 welcome_buf[offset..offset + 8].copy_from_slice(&0u64.to_be_bytes());
1212 offset += 8;
1213
1214 offset += encode_string(&mut welcome_buf[offset..], "session123");
1216
1217 offset += encode_string(&mut welcome_buf[offset..], "TestRouter");
1219
1220 encode_header(&mut welcome_buf, 0, offset - payload_start);
1221
1222 client.process(&welcome_buf[..offset]);
1223 assert_eq!(client.state, ClientState::Connected);
1224 }
1225
1226 #[test]
1227 fn test_state_cache() {
1228 let mut cache = StateCache::new();
1229
1230 cache.set("/sensor/temp", Value::Float(25.5));
1231 cache.set("/sensor/humidity", Value::Float(60.0));
1232
1233 assert_eq!(cache.get("/sensor/temp").unwrap().as_float(), Some(25.5));
1234 assert_eq!(
1235 cache.get("/sensor/humidity").unwrap().as_float(),
1236 Some(60.0)
1237 );
1238 assert!(cache.get("/unknown").is_none());
1239 }
1240
1241 #[test]
1242 fn test_memory_size() {
1243 let client_size = core::mem::size_of::<Client>();
1244 let cache_size = core::mem::size_of::<StateCache>();
1245
1246 assert!(
1248 client_size < 4096,
1249 "Client too large: {} bytes",
1250 client_size
1251 );
1252
1253 let total = client_size + 1024; assert!(total < 8192, "Total too large: {} bytes", total);
1256 }
1257
1258 #[cfg(feature = "server")]
1259 #[test]
1260 fn test_mini_router() {
1261 use server::MiniRouter;
1262
1263 let mut router = MiniRouter::new();
1264 router.set("/light/brightness", Value::Float(0.8));
1265
1266 assert_eq!(
1267 router.get("/light/brightness").unwrap().as_float(),
1268 Some(0.8)
1269 );
1270 }
1271
1272 #[cfg(feature = "server")]
1273 #[test]
1274 fn test_mini_router_subscriptions() {
1275 use server::{MiniRouter, Session, Subscription};
1276
1277 let mut sub = Subscription::empty();
1279 sub.active = true;
1280 sub.pattern_len = "/light/*".len();
1281 sub.pattern[..sub.pattern_len].copy_from_slice(b"/light/*");
1282
1283 assert!(sub.matches("/light/brightness"));
1284 assert!(sub.matches("/light/color"));
1285 assert!(!sub.matches("/audio/volume"));
1286 assert!(!sub.matches("/light/zone/1"));
1287
1288 sub.pattern_len = "/light/**".len();
1290 sub.pattern[..sub.pattern_len].copy_from_slice(b"/light/**");
1291 assert!(sub.matches("/light/brightness"));
1292 assert!(sub.matches("/light/zone/1/brightness"));
1293 assert!(!sub.matches("/audio/volume"));
1294
1295 let mut session = Session::new();
1297 session.active = true;
1298 assert!(session.subscribe(1, "/light/*"));
1299 assert!(session.subscribe(2, "/audio/**"));
1300 assert_eq!(session.sub_count, 2);
1301
1302 assert!(session.has_match("/light/brightness"));
1303 assert!(session.has_match("/audio/master/volume"));
1304 assert!(!session.has_match("/midi/cc/1"));
1305
1306 assert!(session.unsubscribe(1));
1308 assert_eq!(session.sub_count, 1);
1309 assert!(!session.has_match("/light/brightness"));
1310 assert!(session.has_match("/audio/master/volume"));
1311 }
1312
1313 #[cfg(feature = "server")]
1314 #[test]
1315 fn test_mini_router_broadcast() {
1316 use server::MiniRouter;
1317
1318 let mut router = MiniRouter::new();
1319
1320 {
1326 let session = router.session_mut(0).unwrap();
1327 session.active = true;
1328 session.id = 0;
1329 session.subscribe(1, "/light/**");
1330 }
1331
1332 {
1333 let session = router.session_mut(1).unwrap();
1334 session.active = true;
1335 session.id = 1;
1336 session.subscribe(1, "/audio/**");
1337 }
1338
1339 router.set("/light/brightness", Value::Float(0.75));
1341
1342 let targets = router.get_broadcast_targets("/light/brightness", 2);
1344 assert_eq!(targets.count, 1);
1345 assert!(targets.clients[0]); assert!(!targets.clients[1]); let targets = router.get_broadcast_targets("/audio/volume", 2);
1350 assert_eq!(targets.count, 1);
1351 assert!(!targets.clients[0]); assert!(targets.clients[1]); let targets = router.get_broadcast_targets("/light/brightness", 0);
1356 assert_eq!(targets.count, 0); }
1358}