Skip to main content

worterbuch_common/
lib.rs

1/*
2 *  Worterbuch common modules library
3 *
4 *  Copyright (C) 2024 Michael Bachmann
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU Affero General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU Affero General Public License for more details.
15 *
16 *  You should have received a copy of the GNU Affero General Public License
17 *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
18 */
19
20#[cfg(feature = "benchmark")]
21pub mod benchmark;
22mod client;
23pub mod error;
24mod server;
25
26pub use client::*;
27use serde_repr::{Deserialize_repr, Serialize_repr};
28pub use server::*;
29
30use error::WorterbuchResult;
31use serde::{Deserialize, Serialize, de::DeserializeOwned};
32use serde_json::json;
33use std::{fmt, net::SocketAddr, ops::Deref};
34use tokio::sync::{mpsc, oneshot};
35use tracing::Span;
36use uuid::Uuid;
37
38#[cfg(feature = "jemalloc")]
39mod jemalloc;
40#[cfg(feature = "jemalloc")]
41pub mod profiling;
42#[cfg(feature = "redb")]
43pub mod redb;
44
45pub const INTERNAL_CLIENT_ID: ClientId = ClientId::nil();
46
47pub const SYSTEM_TOPIC_ROOT: &str = "$SYS";
48pub const SYSTEM_TOPIC_ROOT_PREFIX: &str = "$SYS/";
49pub const SYSTEM_TOPIC_NAME: &str = "name";
50pub const SYSTEM_TOPIC_CLIENTS: &str = "clients";
51pub const SYSTEM_TOPIC_VERSION: &str = "version";
52pub const SYSTEM_TOPIC_LICENSE: &str = "license";
53pub const SYSTEM_TOPIC_SOURCES: &str = "source-code";
54pub const SYSTEM_TOPIC_SUBSCRIPTIONS: &str = "subscriptions";
55pub const SYSTEM_TOPIC_LOCKS: &str = "locks";
56pub const SYSTEM_TOPIC_CLIENTS_PROTOCOL: &str = "protocol";
57pub const SYSTEM_TOPIC_CLIENTS_PROTOCOL_VERSION: &str = "protocolVersion";
58pub const SYSTEM_TOPIC_CLIENTS_ADDRESS: &str = "address";
59pub const SYSTEM_TOPIC_CLIENTS_TIMESTAMP: &str = "connectedSince";
60pub const SYSTEM_TOPIC_LAST_WILL: &str = "lastWill";
61pub const SYSTEM_TOPIC_GRAVE_GOODS: &str = "graveGoods";
62pub const SYSTEM_TOPIC_CLIENT_NAME: &str = "clientName";
63pub const SYSTEM_TOPIC_SUPPORTED_PROTOCOL_VERSION: &str = "protocolVersion";
64pub const SYSTEM_TOPIC_MODE: &str = "mode";
65pub const SYSTEM_TOPIC_UPTIME: &str = "uptime";
66pub const SYSTEM_TOPIC_STORE: &str = "store";
67pub const SYSTEM_TOPIC_VALUES: &str = "values";
68pub const SYSTEM_TOPIC_COUNT: &str = "count";
69pub const SYSTEM_TOPIC_JEMALLOC: &str = "jemalloc";
70pub const SYSTEM_TOPIC_RAW: &str = "raw";
71pub const SYSTEM_TOPIC_FORMATTED: &str = "formatted";
72
73pub type TransactionId = u64;
74pub type RequestPattern = String;
75pub type RequestPatterns = Vec<RequestPattern>;
76pub type Key = String;
77pub type Value = serde_json::Value;
78pub type KeyValuePairs = Vec<KeyValuePair>;
79pub type TypedKeyValuePairs<T> = Vec<TypedKeyValuePair<T>>;
80pub type MetaData = String;
81pub type Path = String;
82pub type ProtocolVersionSegment = u32;
83pub type ProtocolMajorVersion = ProtocolVersionSegment;
84pub type ProtocolVersions = Vec<ProtocolVersion>;
85pub type LastWill = KeyValuePairs;
86pub type GraveGoods = RequestPatterns;
87pub type UniqueFlag = bool;
88pub type LiveOnlyFlag = bool;
89pub type AuthToken = String;
90pub type AuthTokenKey = String;
91pub type CasVersion = u64;
92pub type ClientId = Uuid;
93
94#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
95pub enum ValueEntry {
96    Cas(Value, u64),
97    #[serde(untagged)]
98    Plain(Value),
99}
100
101impl AsRef<Value> for ValueEntry {
102    fn as_ref(&self) -> &Value {
103        match self {
104            ValueEntry::Plain(value) => value,
105            ValueEntry::Cas(value, _) => value,
106        }
107    }
108}
109
110impl From<ValueEntry> for Value {
111    fn from(value: ValueEntry) -> Self {
112        match value {
113            ValueEntry::Plain(value) => value,
114            ValueEntry::Cas(value, _) => value,
115        }
116    }
117}
118
119impl From<Value> for ValueEntry {
120    fn from(value: Value) -> Self {
121        ValueEntry::Plain(value)
122    }
123}
124
125#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
126#[serde(rename_all = "camelCase")]
127pub enum Privilege {
128    Read,
129    Write,
130    Delete,
131    Profile,
132    WebLogin,
133}
134
135impl fmt::Display for Privilege {
136    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
137        match self {
138            Privilege::Read => "read".fmt(f),
139            Privilege::Write => "write".fmt(f),
140            Privilege::Delete => "delete".fmt(f),
141            Privilege::Profile => "profile".fmt(f),
142            Privilege::WebLogin => "web-login".fmt(f),
143        }
144    }
145}
146
147#[derive(Debug, Serialize, Deserialize)]
148#[serde(rename_all = "camelCase")]
149pub enum AuthCheck<'a> {
150    Pattern(&'a str),
151    Flag,
152}
153
154#[derive(Debug, Clone, Serialize, Deserialize)]
155#[serde(rename_all = "camelCase")]
156pub enum AuthCheckOwned {
157    Pattern(String),
158    Flag,
159}
160
161impl<'a> From<AuthCheck<'a>> for AuthCheckOwned {
162    fn from(value: AuthCheck<'a>) -> Self {
163        match value {
164            AuthCheck::Pattern(p) => AuthCheckOwned::Pattern(p.to_owned()),
165            AuthCheck::Flag => AuthCheckOwned::Flag,
166        }
167    }
168}
169
170impl fmt::Display for AuthCheckOwned {
171    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172        match self {
173            AuthCheckOwned::Pattern(p) => p.fmt(f),
174            AuthCheckOwned::Flag => true.fmt(f),
175        }
176    }
177}
178
179#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize_repr, Deserialize_repr)]
180#[repr(u8)]
181pub enum ErrorCode {
182    IllegalWildcard = 0,
183    IllegalMultiWildcard = 1,
184    MultiWildcardAtIllegalPosition = 2,
185    IoError = 3,
186    SerdeError = 4,
187    NoSuchValue = 5,
188    NotSubscribed = 6,
189    ProtocolNegotiationFailed = 7,
190    InvalidServerResponse = 8,
191    ReadOnlyKey = 9,
192    AuthorizationFailed = 10,
193    AuthorizationRequired = 11,
194    AlreadyAuthorized = 12,
195    MissingValue = 13,
196    Unauthorized = 14,
197    NoPubStream = 15,
198    NotLeader = 16,
199    Cas = 17,
200    CasVersionMismatch = 18,
201    NotImplemented = 19,
202    KeyIsLocked = 20,
203    KeyIsNotLocked = 21,
204    LockAcquisitionCancelled = 22,
205    FeatureDisabled = 23,
206    ClientIDCollision = 24,
207    Other = u8::MAX,
208}
209
210impl fmt::Display for ErrorCode {
211    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212        (self.to_owned() as u8).fmt(f)
213    }
214}
215
216#[macro_export]
217macro_rules! topic {
218    ($first:expr $(, $rest:expr)*) => {{
219        use std::fmt::Write;
220        let mut s = String::new();
221        write!(s, "{}", $first).expect("writing to a String never fails");
222        $(
223            s.push('/');
224            write!(s, "{}", $rest).expect("writing to a String never fails");
225        )*
226        s
227    }};
228}
229
230pub type Version = String;
231
232#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
233pub struct ProtocolVersion(ProtocolVersionSegment, ProtocolVersionSegment);
234
235impl ProtocolVersion {
236    pub const fn new(major: ProtocolVersionSegment, minor: ProtocolVersionSegment) -> Self {
237        Self(major, minor)
238    }
239
240    pub const fn major(&self) -> ProtocolVersionSegment {
241        self.0
242    }
243
244    pub const fn minor(&self) -> ProtocolVersionSegment {
245        self.1
246    }
247
248    pub fn is_compatible_with_server(&self, server_version: &ProtocolVersion) -> bool {
249        self.major() == server_version.major() && self.minor() <= server_version.minor()
250    }
251
252    pub fn is_compatible_with_client_version(&self, client_version: &ProtocolVersion) -> bool {
253        self.major() == client_version.major() && self.minor() >= client_version.minor()
254    }
255}
256
257impl fmt::Display for ProtocolVersion {
258    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
259        write!(f, "{}.{}", self.0, self.1)
260    }
261}
262
263#[derive(Debug, Clone, PartialEq, Eq, Serialize, Hash, Deserialize)]
264pub enum Protocol {
265    TCP,
266    WS,
267    HTTP,
268    UNIX,
269}
270
271#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
272#[serde(rename_all = "camelCase")]
273pub struct KeyValuePair {
274    pub key: Key,
275    pub value: Value,
276}
277
278impl fmt::Display for KeyValuePair {
279    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
280        write!(f, "{}={}", self.key, self.value)
281    }
282}
283
284impl From<KeyValuePair> for Option<Value> {
285    fn from(kvp: KeyValuePair) -> Self {
286        Some(kvp.value)
287    }
288}
289
290impl From<KeyValuePair> for Value {
291    fn from(kvp: KeyValuePair) -> Self {
292        kvp.value
293    }
294}
295
296impl KeyValuePair {
297    pub fn new(key: String, value: Value) -> Self {
298        KeyValuePair { key, value }
299    }
300
301    pub fn of<S: Serialize>(key: impl Into<String>, value: S) -> Self {
302        KeyValuePair::new(key.into(), json!(value))
303    }
304}
305
306#[derive(Debug, Clone, PartialEq, Eq)]
307pub struct TypedKeyValuePair<T: DeserializeOwned> {
308    pub key: Key,
309    pub value: T,
310}
311
312impl<T: DeserializeOwned> TryFrom<KeyValuePair> for TypedKeyValuePair<T> {
313    type Error = serde_json::Error;
314
315    fn try_from(kvp: KeyValuePair) -> Result<Self, Self::Error> {
316        let deserialized = serde_json::from_value(kvp.value)?;
317        Ok(TypedKeyValuePair {
318            key: kvp.key,
319            value: deserialized,
320        })
321    }
322}
323
324impl<S: Serialize> From<(String, S)> for KeyValuePair {
325    fn from((key, value): (String, S)) -> Self {
326        let value = json!(value);
327        KeyValuePair { key, value }
328    }
329}
330
331impl<S: Serialize> From<(&str, S)> for KeyValuePair {
332    fn from((key, value): (&str, S)) -> Self {
333        let value = json!(value);
334        KeyValuePair {
335            key: key.to_owned(),
336            value,
337        }
338    }
339}
340
341// #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord, Tags)]
342pub type RegularKeySegment = String;
343
344pub fn parse_segments(pattern: &str) -> WorterbuchResult<Vec<RegularKeySegment>> {
345    let mut segments = Vec::new();
346    for segment in pattern.split('/') {
347        let ks: KeySegment = segment.into();
348        match ks {
349            KeySegment::Regular(reg) => segments.push(reg),
350            KeySegment::Wildcard => {
351                return Err(error::WorterbuchError::IllegalWildcard(pattern.to_owned()));
352            }
353            KeySegment::MultiWildcard => {
354                return Err(error::WorterbuchError::IllegalMultiWildcard(
355                    pattern.to_owned(),
356                ));
357            }
358        }
359    }
360    Ok(segments)
361}
362
363#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
364pub enum KeySegment {
365    Regular(RegularKeySegment),
366    Wildcard,
367    MultiWildcard,
368    // RegexWildcard(String),
369}
370
371impl AsRef<str> for KeySegment {
372    fn as_ref(&self) -> &str {
373        match self {
374            KeySegment::Regular(segment) => segment.as_str(),
375            KeySegment::Wildcard => "?",
376            KeySegment::MultiWildcard => "#",
377        }
378    }
379}
380
381pub fn format_path(path: &[impl AsRef<str>]) -> String {
382    let mut path = path.iter().fold(String::new(), |mut a, b| {
383        let b = b.as_ref();
384        a.reserve(b.len() + 1);
385        a.push_str(b);
386        a.push('/');
387        a
388    });
389    path.pop();
390    path
391}
392
393impl From<RegularKeySegment> for KeySegment {
394    fn from(reg: RegularKeySegment) -> Self {
395        Self::Regular(reg)
396    }
397}
398
399impl Deref for KeySegment {
400    type Target = str;
401
402    fn deref(&self) -> &Self::Target {
403        match self {
404            KeySegment::Regular(reg) => reg,
405            KeySegment::Wildcard => "?",
406            KeySegment::MultiWildcard => "#",
407        }
408    }
409}
410
411impl fmt::Display for KeySegment {
412    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
413        match self {
414            KeySegment::Regular(segment) => segment.fmt(f),
415            KeySegment::Wildcard => write!(f, "?"),
416            KeySegment::MultiWildcard => write!(f, "#"),
417            // PathSegment::RegexWildcard(regex) => write!(f, "?{regex}?"),
418        }
419    }
420}
421
422impl From<&str> for KeySegment {
423    fn from(str: &str) -> Self {
424        match str {
425            "?" => KeySegment::Wildcard,
426            "#" => KeySegment::MultiWildcard,
427            other => KeySegment::Regular(other.to_owned()),
428        }
429    }
430}
431
432impl KeySegment {
433    pub fn parse(pattern: impl AsRef<str>) -> Vec<KeySegment> {
434        let segments = pattern.as_ref().split('/');
435        segments.map(KeySegment::from).collect()
436    }
437}
438
439pub fn quote(str: impl AsRef<str>) -> String {
440    let str_ref = str.as_ref();
441    if str_ref.starts_with('\"') && str_ref.ends_with('\"') {
442        str_ref.to_owned()
443    } else {
444        format!("\"{str_ref}\"")
445    }
446}
447
448#[derive(Debug, Clone, Eq, PartialEq, Hash)]
449pub struct SubscriptionId {
450    pub client_id: ClientId,
451    pub transaction_id: TransactionId,
452}
453
454impl SubscriptionId {
455    pub fn new(client_id: ClientId, transaction_id: TransactionId) -> Self {
456        SubscriptionId {
457            client_id,
458            transaction_id,
459        }
460    }
461}
462
463pub trait WbApi {
464    fn supported_protocol_versions(&self) -> Vec<ProtocolVersion>;
465
466    fn version(&self) -> &str;
467
468    fn get(&self, key: Key) -> impl Future<Output = WorterbuchResult<Value>> + Send;
469
470    fn cget(&self, key: Key) -> impl Future<Output = WorterbuchResult<(Value, CasVersion)>> + Send;
471
472    fn pget(
473        &self,
474        pattern: RequestPattern,
475    ) -> impl Future<Output = WorterbuchResult<KeyValuePairs>> + Send;
476
477    fn set(
478        &self,
479        key: Key,
480        value: Value,
481        client_id: ClientId,
482    ) -> impl Future<Output = WorterbuchResult<()>> + Send;
483
484    fn cset(
485        &self,
486        key: Key,
487        value: Value,
488        version: CasVersion,
489        client_id: ClientId,
490    ) -> impl Future<Output = WorterbuchResult<()>> + Send;
491
492    fn lock(
493        &self,
494        key: Key,
495        client_id: ClientId,
496    ) -> impl Future<Output = WorterbuchResult<()>> + Send;
497
498    fn acquire_lock(
499        &self,
500        key: Key,
501        client_id: ClientId,
502    ) -> impl Future<Output = WorterbuchResult<oneshot::Receiver<()>>> + Send;
503
504    fn release_lock(
505        &self,
506        key: Key,
507        client_id: ClientId,
508    ) -> impl Future<Output = WorterbuchResult<()>> + Send;
509
510    fn spub_init(
511        &self,
512        transaction_id: TransactionId,
513        key: Key,
514        client_id: ClientId,
515    ) -> impl Future<Output = WorterbuchResult<()>> + Send;
516
517    fn spub(
518        &self,
519        transaction_id: TransactionId,
520        value: Value,
521        client_id: ClientId,
522    ) -> impl Future<Output = WorterbuchResult<()>> + Send;
523
524    fn publish(&self, key: Key, value: Value) -> impl Future<Output = WorterbuchResult<()>> + Send;
525
526    fn ls(
527        &self,
528        parent: Option<Key>,
529    ) -> impl Future<Output = WorterbuchResult<Vec<RegularKeySegment>>> + Send;
530
531    fn pls(
532        &self,
533        parent: Option<RequestPattern>,
534    ) -> impl Future<Output = WorterbuchResult<Vec<RegularKeySegment>>> + Send;
535
536    fn subscribe(
537        &self,
538        client_id: ClientId,
539        transaction_id: TransactionId,
540        key: Key,
541        unique: bool,
542        live_only: bool,
543    ) -> impl Future<Output = WorterbuchResult<(mpsc::Receiver<StateEvent>, SubscriptionId)>> + Send;
544
545    fn psubscribe(
546        &self,
547        client_id: ClientId,
548        transaction_id: TransactionId,
549        pattern: RequestPattern,
550        unique: bool,
551        live_only: bool,
552    ) -> impl Future<Output = WorterbuchResult<(mpsc::Receiver<PStateEvent>, SubscriptionId)>> + Send;
553
554    fn subscribe_ls(
555        &self,
556        client_id: ClientId,
557        transaction_id: TransactionId,
558        parent: Option<Key>,
559    ) -> impl Future<
560        Output = WorterbuchResult<(mpsc::Receiver<Vec<RegularKeySegment>>, SubscriptionId)>,
561    > + Send;
562
563    fn unsubscribe(
564        &self,
565        client_id: ClientId,
566        transaction_id: TransactionId,
567    ) -> impl Future<Output = WorterbuchResult<()>> + Send;
568
569    fn unsubscribe_ls(
570        &self,
571        client_id: ClientId,
572        transaction_id: TransactionId,
573    ) -> impl Future<Output = WorterbuchResult<()>> + Send;
574
575    fn delete(
576        &self,
577        key: Key,
578        client_id: ClientId,
579    ) -> impl Future<Output = WorterbuchResult<Value>> + Send;
580
581    fn pdelete(
582        &self,
583        pattern: RequestPattern,
584        client_id: ClientId,
585    ) -> impl Future<Output = WorterbuchResult<KeyValuePairs>> + Send;
586
587    fn connected(
588        &self,
589        client_id: ClientId,
590        remote_addr: Option<SocketAddr>,
591        protocol: Protocol,
592    ) -> impl Future<Output = WorterbuchResult<()>> + Send;
593
594    fn protocol_switched(
595        &self,
596        client_id: ClientId,
597        protocol: ProtocolMajorVersion,
598    ) -> impl Future<Output = WorterbuchResult<()>> + Send;
599
600    fn disconnected(
601        &self,
602        client_id: ClientId,
603        remote_addr: Option<SocketAddr>,
604    ) -> impl Future<Output = WorterbuchResult<()>> + Send;
605
606    fn export(
607        &self,
608        span: Span,
609    ) -> impl Future<Output = WorterbuchResult<(Value, GraveGoods, LastWill)>> + Send;
610
611    fn import(
612        &self,
613        json: String,
614    ) -> impl Future<Output = WorterbuchResult<Vec<(String, (ValueEntry, bool))>>> + Send;
615
616    fn entries(&self) -> impl Future<Output = WorterbuchResult<usize>> + Send;
617}
618
619mod macros {
620
621    #[macro_export]
622    macro_rules! while_select {
623        (biased; $($tokens:tt)*) => {
624            '__while_select: loop {
625                match ::tokio::select! { biased; $($tokens)* } {
626                    ::std::ops::ControlFlow::Continue(_) => {}
627                    ::std::ops::ControlFlow::Break(v) => break '__while_select v,
628                }
629            }
630        };
631        ($($tokens:tt)*) => {
632            '__while_select: loop {
633                match ::tokio::select! { $($tokens)* } {
634                    ::std::ops::ControlFlow::Continue(_) => {}
635                    ::std::ops::ControlFlow::Break(v) => break '__while_select v,
636                }
637            }
638        };
639    }
640
641    mod test {
642
643        #![allow(clippy::as_conversions)]
644        #![allow(clippy::unwrap_used)]
645
646        #[tokio::test]
647        async fn while_select_breaks_as_expected_on_control_flow() {
648            use std::{ops::ControlFlow, time::Duration};
649            use tokio::time::sleep;
650
651            let mut fut_a = Box::pin(async { ControlFlow::Break::<&'static str>("hello") });
652            let mut fut_b = Box::pin(async {
653                sleep(Duration::from_secs(1)).await;
654                ControlFlow::Break::<&'static str>("nein")
655            });
656
657            let res = while_select!(
658                it = &mut fut_a => it,
659                it = &mut fut_b => it,
660            );
661
662            assert_eq!("hello", res);
663        }
664
665        #[tokio::test]
666        async fn while_select_biased_breaks_as_expected_on_control_flow() {
667            use std::{ops::ControlFlow, time::Duration};
668            use tokio::time::sleep;
669
670            let mut fut_a = Box::pin(async { ControlFlow::Break::<&'static str>("hello") });
671            let mut fut_b = Box::pin(async {
672                sleep(Duration::from_secs(1)).await;
673                ControlFlow::Break::<&'static str>("nein")
674            });
675
676            let res = while_select!(
677                biased;
678                it = &mut fut_a => it,
679                it = &mut fut_b => it,
680            );
681
682            assert_eq!("hello", res);
683        }
684
685        #[tokio::test]
686        async fn while_select_breaks_as_expected_on_break() {
687            use std::{ops::ControlFlow, time::Duration};
688            use tokio::time::sleep;
689
690            let mut fut_a = Box::pin(async {});
691            let mut fut_b = Box::pin(async {
692                sleep(Duration::from_secs(1)).await;
693                ControlFlow::Break::<&'static str>("nein")
694            });
695
696            let res = while_select!(
697                _ = &mut fut_a => break "hello",
698                it = &mut fut_b => it,
699            );
700
701            assert_eq!("hello", res);
702        }
703    }
704}
705
706#[cfg(test)]
707mod test {
708
709    #![allow(clippy::as_conversions)]
710    #![allow(clippy::unwrap_used)]
711
712    use crate::{ErrorCode, ProtocolVersion};
713    use serde_json::json;
714
715    #[test]
716    fn protocol_versions_are_sorted_correctly() {
717        assert!(ProtocolVersion::new(1, 2) < ProtocolVersion::new(3, 2));
718        assert!(ProtocolVersion::new(1, 2) == ProtocolVersion::new(1, 2));
719        assert!(ProtocolVersion::new(2, 1) > ProtocolVersion::new(1, 9));
720
721        let mut versions = vec![
722            ProtocolVersion::new(1, 2),
723            ProtocolVersion::new(0, 456),
724            ProtocolVersion::new(9, 0),
725            ProtocolVersion::new(3, 15),
726        ];
727        versions.sort();
728        assert_eq!(
729            vec![
730                ProtocolVersion::new(0, 456),
731                ProtocolVersion::new(1, 2),
732                ProtocolVersion::new(3, 15),
733                ProtocolVersion::new(9, 0)
734            ],
735            versions
736        );
737    }
738
739    #[test]
740    fn topic_macro_generates_topic_correctly() {
741        assert_eq!(
742            "hello/world/foo/bar",
743            topic!("hello", "world", "foo", "bar")
744        );
745    }
746
747    #[test]
748    fn error_codes_are_serialized_as_numbers() {
749        assert_eq!(
750            "1",
751            serde_json::to_string(&ErrorCode::IllegalMultiWildcard).unwrap()
752        )
753    }
754
755    #[test]
756    fn error_codes_are_deserialized_from_numbers() {
757        assert_eq!(
758            ErrorCode::ProtocolNegotiationFailed,
759            serde_json::from_str("7").unwrap()
760        )
761    }
762
763    #[test]
764    fn protocol_version_get_serialized_correctly() {
765        assert_eq!(&json!(ProtocolVersion::new(2, 1)).to_string(), "[2,1]")
766    }
767
768    #[test]
769    fn protocol_version_get_formatted_correctly() {
770        assert_eq!(&ProtocolVersion::new(2, 1).to_string(), "2.1")
771    }
772
773    #[test]
774    fn compatible_version_is_selected_correctly() {
775        let client_version = ProtocolVersion::new(1, 2);
776        let server_versions = [
777            ProtocolVersion::new(0, 11),
778            ProtocolVersion::new(1, 6),
779            ProtocolVersion::new(2, 0),
780        ];
781        let compatible_version = server_versions
782            .iter()
783            .find(|v| client_version.is_compatible_with_server(v));
784        assert_eq!(compatible_version, Some(&server_versions[1]))
785    }
786}