cassandra_protocol/frame/
events.rs

1use crate::frame::traits::FromCursor;
2use crate::frame::{Serialize, Version};
3use crate::types::{from_cursor_str, from_cursor_string_list, serialize_str, CIntShort};
4use crate::{error, Error};
5use derive_more::Display;
6use std::cmp::PartialEq;
7use std::convert::TryFrom;
8use std::io::Cursor;
9use std::net::SocketAddr;
10
11// Event types
12const TOPOLOGY_CHANGE: &str = "TOPOLOGY_CHANGE";
13const STATUS_CHANGE: &str = "STATUS_CHANGE";
14const SCHEMA_CHANGE: &str = "SCHEMA_CHANGE";
15
16// Topology changes
17const NEW_NODE: &str = "NEW_NODE";
18const REMOVED_NODE: &str = "REMOVED_NODE";
19
20// Status changes
21const UP: &str = "UP";
22const DOWN: &str = "DOWN";
23
24// Schema changes
25const CREATED: &str = "CREATED";
26const UPDATED: &str = "UPDATED";
27const DROPPED: &str = "DROPPED";
28
29// Schema change targets
30const KEYSPACE: &str = "KEYSPACE";
31const TABLE: &str = "TABLE";
32const TYPE: &str = "TYPE";
33const FUNCTION: &str = "FUNCTION";
34const AGGREGATE: &str = "AGGREGATE";
35
36/// Simplified `ServerEvent` that does not contain details
37/// about a concrete change. It may be useful for subscription
38/// when you need only string representation of an event.
39#[derive(Debug, PartialEq, Copy, Clone, Ord, PartialOrd, Eq, Hash)]
40#[non_exhaustive]
41pub enum SimpleServerEvent {
42    TopologyChange,
43    StatusChange,
44    SchemaChange,
45}
46
47impl SimpleServerEvent {
48    pub fn as_str(&self) -> &'static str {
49        match *self {
50            SimpleServerEvent::TopologyChange => TOPOLOGY_CHANGE,
51            SimpleServerEvent::StatusChange => STATUS_CHANGE,
52            SimpleServerEvent::SchemaChange => SCHEMA_CHANGE,
53        }
54    }
55}
56
57impl From<ServerEvent> for SimpleServerEvent {
58    fn from(event: ServerEvent) -> SimpleServerEvent {
59        match event {
60            ServerEvent::TopologyChange(_) => SimpleServerEvent::TopologyChange,
61            ServerEvent::StatusChange(_) => SimpleServerEvent::StatusChange,
62            ServerEvent::SchemaChange(_) => SimpleServerEvent::SchemaChange,
63        }
64    }
65}
66
67impl<'a> From<&'a ServerEvent> for SimpleServerEvent {
68    fn from(event: &'a ServerEvent) -> SimpleServerEvent {
69        match *event {
70            ServerEvent::TopologyChange(_) => SimpleServerEvent::TopologyChange,
71            ServerEvent::StatusChange(_) => SimpleServerEvent::StatusChange,
72            ServerEvent::SchemaChange(_) => SimpleServerEvent::SchemaChange,
73        }
74    }
75}
76
77impl TryFrom<&str> for SimpleServerEvent {
78    type Error = error::Error;
79
80    fn try_from(value: &str) -> Result<Self, Self::Error> {
81        match value {
82            TOPOLOGY_CHANGE => Ok(SimpleServerEvent::TopologyChange),
83            STATUS_CHANGE => Ok(SimpleServerEvent::StatusChange),
84            SCHEMA_CHANGE => Ok(SimpleServerEvent::SchemaChange),
85            value => Err(Error::UnknownServerEvent(value.into())),
86        }
87    }
88}
89
90impl PartialEq<ServerEvent> for SimpleServerEvent {
91    fn eq(&self, full_event: &ServerEvent) -> bool {
92        self == &SimpleServerEvent::from(full_event)
93    }
94}
95
96/// Full server event that contains all details about a concrete change.
97#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
98#[non_exhaustive]
99pub enum ServerEvent {
100    /// Events related to change in the cluster topology
101    TopologyChange(TopologyChange),
102    /// Events related to change of node status.
103    StatusChange(StatusChange),
104    /// Events related to schema change.
105    SchemaChange(SchemaChange),
106}
107
108impl Serialize for ServerEvent {
109    fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
110        match &self {
111            ServerEvent::TopologyChange(t) => {
112                serialize_str(cursor, TOPOLOGY_CHANGE, version);
113                t.serialize(cursor, version);
114            }
115            ServerEvent::StatusChange(s) => {
116                serialize_str(cursor, STATUS_CHANGE, version);
117                s.serialize(cursor, version);
118            }
119            ServerEvent::SchemaChange(s) => {
120                serialize_str(cursor, SCHEMA_CHANGE, version);
121                s.serialize(cursor, version);
122            }
123        }
124    }
125}
126
127impl PartialEq<SimpleServerEvent> for ServerEvent {
128    fn eq(&self, event: &SimpleServerEvent) -> bool {
129        &SimpleServerEvent::from(self) == event
130    }
131}
132
133impl FromCursor for ServerEvent {
134    fn from_cursor(cursor: &mut Cursor<&[u8]>, version: Version) -> error::Result<ServerEvent> {
135        let event_type = from_cursor_str(cursor)?;
136        match event_type {
137            TOPOLOGY_CHANGE => Ok(ServerEvent::TopologyChange(TopologyChange::from_cursor(
138                cursor, version,
139            )?)),
140            STATUS_CHANGE => Ok(ServerEvent::StatusChange(StatusChange::from_cursor(
141                cursor, version,
142            )?)),
143            SCHEMA_CHANGE => Ok(ServerEvent::SchemaChange(SchemaChange::from_cursor(
144                cursor, version,
145            )?)),
146            _ => Err(Error::UnknownServerEvent(event_type.into())),
147        }
148    }
149}
150
151/// Events related to change in the cluster topology
152#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
153pub struct TopologyChange {
154    pub change_type: TopologyChangeType,
155    pub addr: SocketAddr,
156}
157
158impl Serialize for TopologyChange {
159    //noinspection DuplicatedCode
160    fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
161        self.change_type.serialize(cursor, version);
162        self.addr.serialize(cursor, version);
163    }
164}
165
166impl FromCursor for TopologyChange {
167    fn from_cursor(cursor: &mut Cursor<&[u8]>, version: Version) -> error::Result<TopologyChange> {
168        let change_type = TopologyChangeType::from_cursor(cursor, version)?;
169        let addr = SocketAddr::from_cursor(cursor, version)?;
170
171        Ok(TopologyChange { change_type, addr })
172    }
173}
174
175#[derive(Debug, Copy, Clone, PartialEq, Ord, PartialOrd, Eq, Hash, Display)]
176#[non_exhaustive]
177pub enum TopologyChangeType {
178    NewNode,
179    RemovedNode,
180}
181
182impl Serialize for TopologyChangeType {
183    fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
184        match &self {
185            TopologyChangeType::NewNode => serialize_str(cursor, NEW_NODE, version),
186            TopologyChangeType::RemovedNode => serialize_str(cursor, REMOVED_NODE, version),
187        }
188    }
189}
190
191impl FromCursor for TopologyChangeType {
192    fn from_cursor(
193        cursor: &mut Cursor<&[u8]>,
194        _version: Version,
195    ) -> error::Result<TopologyChangeType> {
196        from_cursor_str(cursor).and_then(|tc| match tc {
197            NEW_NODE => Ok(TopologyChangeType::NewNode),
198            REMOVED_NODE => Ok(TopologyChangeType::RemovedNode),
199            _ => Err(Error::UnexpectedTopologyChangeType(tc.into())),
200        })
201    }
202}
203
204/// Events related to change of node status.
205#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
206pub struct StatusChange {
207    pub change_type: StatusChangeType,
208    pub addr: SocketAddr,
209}
210
211impl Serialize for StatusChange {
212    //noinspection DuplicatedCode
213    fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
214        self.change_type.serialize(cursor, version);
215        self.addr.serialize(cursor, version);
216    }
217}
218
219impl FromCursor for StatusChange {
220    fn from_cursor(cursor: &mut Cursor<&[u8]>, version: Version) -> error::Result<StatusChange> {
221        let change_type = StatusChangeType::from_cursor(cursor, version)?;
222        let addr = SocketAddr::from_cursor(cursor, version)?;
223
224        Ok(StatusChange { change_type, addr })
225    }
226}
227
228#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Display)]
229#[non_exhaustive]
230pub enum StatusChangeType {
231    Up,
232    Down,
233}
234
235impl Serialize for StatusChangeType {
236    fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
237        match self {
238            StatusChangeType::Up => serialize_str(cursor, UP, version),
239            StatusChangeType::Down => serialize_str(cursor, DOWN, version),
240        }
241    }
242}
243
244impl FromCursor for StatusChangeType {
245    fn from_cursor(
246        cursor: &mut Cursor<&[u8]>,
247        _version: Version,
248    ) -> error::Result<StatusChangeType> {
249        from_cursor_str(cursor).and_then(|sct| match sct {
250            UP => Ok(StatusChangeType::Up),
251            DOWN => Ok(StatusChangeType::Down),
252            _ => Err(Error::UnexpectedStatusChangeType(sct.into())),
253        })
254    }
255}
256
257/// Events related to schema change.
258#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
259pub struct SchemaChange {
260    pub change_type: SchemaChangeType,
261    pub target: SchemaChangeTarget,
262    pub options: SchemaChangeOptions,
263}
264
265impl Serialize for SchemaChange {
266    fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
267        self.change_type.serialize(cursor, version);
268        self.target.serialize(cursor, version);
269        self.options.serialize(cursor, version);
270    }
271}
272
273impl FromCursor for SchemaChange {
274    fn from_cursor(cursor: &mut Cursor<&[u8]>, version: Version) -> error::Result<SchemaChange> {
275        let change_type = SchemaChangeType::from_cursor(cursor, version)?;
276        let target = SchemaChangeTarget::from_cursor(cursor, version)?;
277        let options = SchemaChangeOptions::from_cursor_and_target(cursor, &target)?;
278
279        Ok(SchemaChange {
280            change_type,
281            target,
282            options,
283        })
284    }
285}
286
287/// Represents type of changes.
288#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Display)]
289#[non_exhaustive]
290pub enum SchemaChangeType {
291    Created,
292    Updated,
293    Dropped,
294}
295
296impl Serialize for SchemaChangeType {
297    fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
298        match self {
299            SchemaChangeType::Created => serialize_str(cursor, CREATED, version),
300            SchemaChangeType::Updated => serialize_str(cursor, UPDATED, version),
301            SchemaChangeType::Dropped => serialize_str(cursor, DROPPED, version),
302        }
303    }
304}
305
306impl FromCursor for SchemaChangeType {
307    fn from_cursor(
308        cursor: &mut Cursor<&[u8]>,
309        _version: Version,
310    ) -> error::Result<SchemaChangeType> {
311        from_cursor_str(cursor).and_then(|ct| match ct {
312            CREATED => Ok(SchemaChangeType::Created),
313            UPDATED => Ok(SchemaChangeType::Updated),
314            DROPPED => Ok(SchemaChangeType::Dropped),
315            _ => Err(Error::UnexpectedSchemaChangeType(ct.into())),
316        })
317    }
318}
319
320/// Refers to a target of changes were made.
321#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Display)]
322#[non_exhaustive]
323pub enum SchemaChangeTarget {
324    Keyspace,
325    Table,
326    Type,
327    Function,
328    Aggregate,
329}
330
331impl Serialize for SchemaChangeTarget {
332    fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
333        match self {
334            SchemaChangeTarget::Keyspace => serialize_str(cursor, KEYSPACE, version),
335            SchemaChangeTarget::Table => serialize_str(cursor, TABLE, version),
336            SchemaChangeTarget::Type => serialize_str(cursor, TYPE, version),
337            SchemaChangeTarget::Function => serialize_str(cursor, FUNCTION, version),
338            SchemaChangeTarget::Aggregate => serialize_str(cursor, AGGREGATE, version),
339        }
340    }
341}
342
343impl FromCursor for SchemaChangeTarget {
344    fn from_cursor(
345        cursor: &mut Cursor<&[u8]>,
346        _version: Version,
347    ) -> error::Result<SchemaChangeTarget> {
348        from_cursor_str(cursor).and_then(|t| match t {
349            KEYSPACE => Ok(SchemaChangeTarget::Keyspace),
350            TABLE => Ok(SchemaChangeTarget::Table),
351            TYPE => Ok(SchemaChangeTarget::Type),
352            FUNCTION => Ok(SchemaChangeTarget::Function),
353            AGGREGATE => Ok(SchemaChangeTarget::Aggregate),
354            _ => Err(Error::UnexpectedSchemaChangeTarget(t.into())),
355        })
356    }
357}
358
359/// Information about changes made.
360#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
361#[non_exhaustive]
362pub enum SchemaChangeOptions {
363    /// Changes related to keyspaces. Contains keyspace name.
364    Keyspace(String),
365    /// Changes related to tables. Contains keyspace and table names.
366    TableType(String, String),
367    /// Changes related to functions and aggregations. Contains:
368    /// * keyspace containing the user defined function/aggregate
369    /// * the function/aggregate name
370    /// * list of strings, one string for each argument type (as CQL type)
371    FunctionAggregate(String, String, Vec<String>),
372}
373
374impl Serialize for SchemaChangeOptions {
375    fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
376        match self {
377            SchemaChangeOptions::Keyspace(ks) => {
378                serialize_str(cursor, ks, version);
379            }
380            SchemaChangeOptions::TableType(ks, t) => {
381                serialize_str(cursor, ks, version);
382                serialize_str(cursor, t, version);
383            }
384            SchemaChangeOptions::FunctionAggregate(ks, fa_name, list) => {
385                serialize_str(cursor, ks, version);
386                serialize_str(cursor, fa_name, version);
387
388                let len = list.len() as CIntShort;
389                len.serialize(cursor, version);
390                list.iter().for_each(|x| serialize_str(cursor, x, version));
391            }
392        }
393    }
394}
395
396impl SchemaChangeOptions {
397    fn from_cursor_and_target(
398        cursor: &mut Cursor<&[u8]>,
399        target: &SchemaChangeTarget,
400    ) -> error::Result<SchemaChangeOptions> {
401        Ok(match *target {
402            SchemaChangeTarget::Keyspace => SchemaChangeOptions::from_cursor_keyspace(cursor)?,
403            SchemaChangeTarget::Table | SchemaChangeTarget::Type => {
404                SchemaChangeOptions::from_cursor_table_type(cursor)?
405            }
406            SchemaChangeTarget::Function | SchemaChangeTarget::Aggregate => {
407                SchemaChangeOptions::from_cursor_function_aggregate(cursor)?
408            }
409        })
410    }
411
412    fn from_cursor_keyspace(cursor: &mut Cursor<&[u8]>) -> error::Result<SchemaChangeOptions> {
413        Ok(SchemaChangeOptions::Keyspace(
414            from_cursor_str(cursor)?.to_string(),
415        ))
416    }
417
418    fn from_cursor_table_type(cursor: &mut Cursor<&[u8]>) -> error::Result<SchemaChangeOptions> {
419        let keyspace = from_cursor_str(cursor)?.to_string();
420        let name = from_cursor_str(cursor)?.to_string();
421        Ok(SchemaChangeOptions::TableType(keyspace, name))
422    }
423
424    fn from_cursor_function_aggregate(
425        cursor: &mut Cursor<&[u8]>,
426    ) -> error::Result<SchemaChangeOptions> {
427        let keyspace = from_cursor_str(cursor)?.to_string();
428        let name = from_cursor_str(cursor)?.to_string();
429        let types = from_cursor_string_list(cursor)?;
430        Ok(SchemaChangeOptions::FunctionAggregate(
431            keyspace, name, types,
432        ))
433    }
434}
435
436#[cfg(test)]
437fn test_encode_decode(bytes: &[u8], expected: ServerEvent) {
438    let mut ks: Cursor<&[u8]> = Cursor::new(bytes);
439    let event = ServerEvent::from_cursor(&mut ks, Version::V4).unwrap();
440    assert_eq!(expected, event);
441
442    let mut buffer = Vec::new();
443    let mut cursor = Cursor::new(&mut buffer);
444    expected.serialize(&mut cursor, Version::V4);
445    assert_eq!(buffer, bytes);
446}
447
448#[cfg(test)]
449mod topology_change_type_test {
450    use super::*;
451    use crate::frame::traits::FromCursor;
452    use std::io::Cursor;
453
454    #[test]
455    fn from_cursor() {
456        let a = &[0, 8, 78, 69, 87, 95, 78, 79, 68, 69];
457        let mut new_node: Cursor<&[u8]> = Cursor::new(a);
458        assert_eq!(
459            TopologyChangeType::from_cursor(&mut new_node, Version::V4).unwrap(),
460            TopologyChangeType::NewNode
461        );
462
463        let b = &[0, 12, 82, 69, 77, 79, 86, 69, 68, 95, 78, 79, 68, 69];
464        let mut removed_node: Cursor<&[u8]> = Cursor::new(b);
465        assert_eq!(
466            TopologyChangeType::from_cursor(&mut removed_node, Version::V4).unwrap(),
467            TopologyChangeType::RemovedNode
468        );
469    }
470
471    #[test]
472    fn serialize() {
473        {
474            let a = &[0, 8, 78, 69, 87, 95, 78, 79, 68, 69];
475            let mut buffer = Vec::new();
476            let mut cursor = Cursor::new(&mut buffer);
477            let new_node = TopologyChangeType::NewNode;
478            new_node.serialize(&mut cursor, Version::V4);
479            assert_eq!(buffer, a);
480        }
481
482        {
483            let b = &[0, 12, 82, 69, 77, 79, 86, 69, 68, 95, 78, 79, 68, 69];
484            let mut buffer = Vec::new();
485            let mut cursor = Cursor::new(&mut buffer);
486            let removed_node = TopologyChangeType::RemovedNode;
487            removed_node.serialize(&mut cursor, Version::V4);
488            assert_eq!(buffer, b);
489        }
490    }
491
492    #[test]
493    #[should_panic]
494    fn from_cursor_wrong() {
495        let a = &[0, 1, 78];
496        let mut wrong: Cursor<&[u8]> = Cursor::new(a);
497        let _ = TopologyChangeType::from_cursor(&mut wrong, Version::V4).unwrap();
498    }
499}
500
501#[cfg(test)]
502mod status_change_type_test {
503    use super::*;
504    use crate::frame::traits::FromCursor;
505    use std::io::Cursor;
506
507    #[test]
508    fn from_cursor() {
509        let a = &[0, 2, 85, 80];
510        let mut up: Cursor<&[u8]> = Cursor::new(a);
511        assert_eq!(
512            StatusChangeType::from_cursor(&mut up, Version::V4).unwrap(),
513            StatusChangeType::Up
514        );
515
516        let b = &[0, 4, 68, 79, 87, 78];
517        let mut down: Cursor<&[u8]> = Cursor::new(b);
518        assert_eq!(
519            StatusChangeType::from_cursor(&mut down, Version::V4).unwrap(),
520            StatusChangeType::Down
521        );
522    }
523
524    #[test]
525    fn serialize() {
526        {
527            let a = &[0, 2, 85, 80];
528            let mut buffer = Vec::new();
529            let mut cursor = Cursor::new(&mut buffer);
530            let up = StatusChangeType::Up;
531            up.serialize(&mut cursor, Version::V4);
532            assert_eq!(buffer, a);
533        }
534
535        {
536            let b = &[0, 4, 68, 79, 87, 78];
537            let mut buffer = Vec::new();
538            let mut cursor = Cursor::new(&mut buffer);
539            let down = StatusChangeType::Down;
540            down.serialize(&mut cursor, Version::V4);
541            assert_eq!(buffer, b);
542        }
543    }
544
545    #[test]
546    fn from_cursor_wrong() {
547        let a = &[0, 1, 78];
548        let mut wrong: Cursor<&[u8]> = Cursor::new(a);
549        let err = StatusChangeType::from_cursor(&mut wrong, Version::V4).unwrap_err();
550
551        assert!(matches!(err, Error::UnexpectedStatusChangeType(_)));
552    }
553}
554
555#[cfg(test)]
556mod schema_change_type_test {
557    use super::*;
558    use crate::frame::traits::FromCursor;
559    use std::io::Cursor;
560
561    #[test]
562    fn from_cursor() {
563        let a = &[0, 7, 67, 82, 69, 65, 84, 69, 68];
564        let mut created: Cursor<&[u8]> = Cursor::new(a);
565        assert_eq!(
566            SchemaChangeType::from_cursor(&mut created, Version::V4).unwrap(),
567            SchemaChangeType::Created
568        );
569
570        let b = &[0, 7, 85, 80, 68, 65, 84, 69, 68];
571        let mut updated: Cursor<&[u8]> = Cursor::new(b);
572        assert_eq!(
573            SchemaChangeType::from_cursor(&mut updated, Version::V4).unwrap(),
574            SchemaChangeType::Updated
575        );
576
577        let c = &[0, 7, 68, 82, 79, 80, 80, 69, 68];
578        let mut dropped: Cursor<&[u8]> = Cursor::new(c);
579        assert_eq!(
580            SchemaChangeType::from_cursor(&mut dropped, Version::V4).unwrap(),
581            SchemaChangeType::Dropped
582        );
583    }
584
585    #[test]
586    fn serialize() {
587        {
588            let a = &[0, 7, 67, 82, 69, 65, 84, 69, 68];
589            let mut buffer = Vec::new();
590            let mut cursor = Cursor::new(&mut buffer);
591            let created = SchemaChangeType::Created;
592            created.serialize(&mut cursor, Version::V4);
593            assert_eq!(buffer, a);
594        }
595        {
596            let b = &[0, 7, 85, 80, 68, 65, 84, 69, 68];
597            let mut buffer = Vec::new();
598            let mut cursor = Cursor::new(&mut buffer);
599            let updated = SchemaChangeType::Updated;
600            updated.serialize(&mut cursor, Version::V4);
601            assert_eq!(buffer, b);
602        }
603
604        {
605            let c = &[0, 7, 68, 82, 79, 80, 80, 69, 68];
606            let mut buffer = Vec::new();
607            let mut cursor = Cursor::new(&mut buffer);
608            let dropped = SchemaChangeType::Dropped;
609            dropped.serialize(&mut cursor, Version::V4);
610            assert_eq!(buffer, c);
611        }
612    }
613
614    #[test]
615    #[should_panic]
616    fn from_cursor_wrong() {
617        let a = &[0, 1, 78];
618        let mut wrong: Cursor<&[u8]> = Cursor::new(a);
619        let _ = SchemaChangeType::from_cursor(&mut wrong, Version::V4).unwrap();
620    }
621}
622
623#[cfg(test)]
624mod schema_change_target_test {
625    use super::*;
626    use crate::frame::traits::FromCursor;
627    use std::io::Cursor;
628
629    #[test]
630    #[allow(clippy::many_single_char_names)]
631    fn schema_change_target() {
632        {
633            let bytes = &[0, 8, 75, 69, 89, 83, 80, 65, 67, 69];
634            let mut keyspace: Cursor<&[u8]> = Cursor::new(bytes);
635            assert_eq!(
636                SchemaChangeTarget::from_cursor(&mut keyspace, Version::V4).unwrap(),
637                SchemaChangeTarget::Keyspace
638            );
639        }
640
641        let b = &[0, 5, 84, 65, 66, 76, 69];
642        let mut table: Cursor<&[u8]> = Cursor::new(b);
643        assert_eq!(
644            SchemaChangeTarget::from_cursor(&mut table, Version::V4).unwrap(),
645            SchemaChangeTarget::Table
646        );
647
648        let c = &[0, 4, 84, 89, 80, 69];
649        let mut _type: Cursor<&[u8]> = Cursor::new(c);
650        assert_eq!(
651            SchemaChangeTarget::from_cursor(&mut _type, Version::V4).unwrap(),
652            SchemaChangeTarget::Type
653        );
654
655        let d = &[0, 8, 70, 85, 78, 67, 84, 73, 79, 78];
656        let mut function: Cursor<&[u8]> = Cursor::new(d);
657        assert_eq!(
658            SchemaChangeTarget::from_cursor(&mut function, Version::V4).unwrap(),
659            SchemaChangeTarget::Function
660        );
661
662        let e = &[0, 9, 65, 71, 71, 82, 69, 71, 65, 84, 69];
663        let mut aggregate: Cursor<&[u8]> = Cursor::new(e);
664        assert_eq!(
665            SchemaChangeTarget::from_cursor(&mut aggregate, Version::V4).unwrap(),
666            SchemaChangeTarget::Aggregate
667        );
668    }
669
670    #[test]
671    fn serialize() {
672        {
673            let a = &[0, 8, 75, 69, 89, 83, 80, 65, 67, 69];
674            let mut buffer = Vec::new();
675            let mut cursor = Cursor::new(&mut buffer);
676            let keyspace = SchemaChangeTarget::Keyspace;
677            keyspace.serialize(&mut cursor, Version::V4);
678            assert_eq!(buffer, a);
679        }
680
681        {
682            let b = &[0, 5, 84, 65, 66, 76, 69];
683            let mut buffer = Vec::new();
684            let mut cursor = Cursor::new(&mut buffer);
685            let table = SchemaChangeTarget::Table;
686            table.serialize(&mut cursor, Version::V4);
687            assert_eq!(buffer, b);
688        }
689
690        {
691            let c = &[0, 4, 84, 89, 80, 69];
692            let mut buffer = Vec::new();
693            let mut cursor = Cursor::new(&mut buffer);
694            let target_type = SchemaChangeTarget::Type;
695            target_type.serialize(&mut cursor, Version::V4);
696            assert_eq!(buffer, c);
697        }
698
699        {
700            let d = &[0, 8, 70, 85, 78, 67, 84, 73, 79, 78];
701            let mut buffer = Vec::new();
702            let mut cursor = Cursor::new(&mut buffer);
703            let function = SchemaChangeTarget::Function;
704            function.serialize(&mut cursor, Version::V4);
705            assert_eq!(buffer, d);
706        }
707
708        {
709            let e = &[0, 9, 65, 71, 71, 82, 69, 71, 65, 84, 69];
710            let mut buffer = Vec::new();
711            let mut cursor = Cursor::new(&mut buffer);
712            let aggregate = SchemaChangeTarget::Aggregate;
713            aggregate.serialize(&mut cursor, Version::V4);
714            assert_eq!(buffer, e);
715        }
716    }
717
718    #[test]
719    #[should_panic]
720    fn from_cursor_wrong() {
721        let a = &[0, 1, 78];
722        let mut wrong: Cursor<&[u8]> = Cursor::new(a);
723        let _ = SchemaChangeTarget::from_cursor(&mut wrong, Version::V4).unwrap();
724    }
725}
726
727#[cfg(test)]
728mod server_event {
729    use super::*;
730
731    #[test]
732    fn topology_change_new_node() {
733        let bytes = &[
734            // topology change
735            0, 15, 84, 79, 80, 79, 76, 79, 71, 89, 95, 67, 72, 65, 78, 71, 69, // new node
736            0, 8, 78, 69, 87, 95, 78, 79, 68, 69, //
737            4, 127, 0, 0, 1, 0, 0, 0, 1, // 127.0.0.1:1
738        ];
739
740        let expected = ServerEvent::TopologyChange(TopologyChange {
741            change_type: TopologyChangeType::NewNode,
742            addr: "127.0.0.1:1".parse().unwrap(),
743        });
744
745        test_encode_decode(bytes, expected);
746    }
747
748    #[test]
749    fn topology_change_removed_node() {
750        let bytes = &[
751            // topology change
752            0, 15, 84, 79, 80, 79, 76, 79, 71, 89, 95, 67, 72, 65, 78, 71, 69,
753            // removed node
754            0, 12, 82, 69, 77, 79, 86, 69, 68, 95, 78, 79, 68, 69, //
755            4, 127, 0, 0, 1, 0, 0, 0, 1, // 127.0.0.1:1
756        ];
757
758        let expected = ServerEvent::TopologyChange(TopologyChange {
759            change_type: TopologyChangeType::RemovedNode,
760            addr: "127.0.0.1:1".parse().unwrap(),
761        });
762
763        test_encode_decode(bytes, expected);
764    }
765
766    #[test]
767    fn status_change_up() {
768        let bytes = &[
769            // status change
770            0, 13, 83, 84, 65, 84, 85, 83, 95, 67, 72, 65, 78, 71, 69, // up
771            0, 2, 85, 80, //
772            4, 127, 0, 0, 1, 0, 0, 0, 1, // 127.0.0.1:1
773        ];
774
775        let expected = ServerEvent::StatusChange(StatusChange {
776            change_type: StatusChangeType::Up,
777            addr: "127.0.0.1:1".parse().unwrap(),
778        });
779
780        test_encode_decode(bytes, expected);
781    }
782
783    #[test]
784    fn status_change_down() {
785        let bytes = &[
786            // status change
787            0, 13, 83, 84, 65, 84, 85, 83, 95, 67, 72, 65, 78, 71, 69, // down
788            0, 4, 68, 79, 87, 78, //
789            4, 127, 0, 0, 1, 0, 0, 0, 1, // 127.0.0.1:1
790        ];
791
792        let expected = ServerEvent::StatusChange(StatusChange {
793            change_type: StatusChangeType::Down,
794            addr: "127.0.0.1:1".parse().unwrap(),
795        });
796
797        test_encode_decode(bytes, expected);
798    }
799
800    #[test]
801    fn schema_change_created() {
802        // keyspace
803        {
804            let bytes = &[
805                // schema change
806                0, 13, 83, 67, 72, 69, 77, 65, 95, 67, 72, 65, 78, 71, 69, // created
807                0, 7, 67, 82, 69, 65, 84, 69, 68, // keyspace
808                0, 8, 75, 69, 89, 83, 80, 65, 67, 69, // my_ks
809                0, 5, 109, 121, 95, 107, 115,
810            ];
811            let expected = ServerEvent::SchemaChange(SchemaChange {
812                change_type: SchemaChangeType::Created,
813                target: SchemaChangeTarget::Keyspace,
814                options: SchemaChangeOptions::Keyspace("my_ks".to_string()),
815            });
816
817            test_encode_decode(bytes, expected);
818        }
819
820        // table
821        {
822            let bytes = &[
823                // schema change
824                0, 13, 83, 67, 72, 69, 77, 65, 95, 67, 72, 65, 78, 71, 69, // created
825                0, 7, 67, 82, 69, 65, 84, 69, 68, // table
826                0, 5, 84, 65, 66, 76, 69, // my_ks
827                0, 5, 109, 121, 95, 107, 115, // my_table
828                0, 8, 109, 121, 95, 116, 97, 98, 108, 101,
829            ];
830            let expected = ServerEvent::SchemaChange(SchemaChange {
831                change_type: SchemaChangeType::Created,
832                target: SchemaChangeTarget::Table,
833                options: SchemaChangeOptions::TableType(
834                    "my_ks".to_string(),
835                    "my_table".to_string(),
836                ),
837            });
838            test_encode_decode(bytes, expected);
839        }
840
841        // type
842        {
843            let bytes = &[
844                // schema change
845                0, 13, 83, 67, 72, 69, 77, 65, 95, 67, 72, 65, 78, 71, 69, // created
846                0, 7, 67, 82, 69, 65, 84, 69, 68, // type
847                0, 4, 84, 89, 80, 69, // my_ks
848                0, 5, 109, 121, 95, 107, 115, // my_table
849                0, 8, 109, 121, 95, 116, 97, 98, 108, 101,
850            ];
851            let expected = ServerEvent::SchemaChange(SchemaChange {
852                change_type: SchemaChangeType::Created,
853                target: SchemaChangeTarget::Type,
854                options: SchemaChangeOptions::TableType(
855                    "my_ks".to_string(),
856                    "my_table".to_string(),
857                ),
858            });
859            test_encode_decode(bytes, expected);
860        }
861
862        {
863            // function
864            let bytes = &[
865                // schema change
866                0, 13, 83, 67, 72, 69, 77, 65, 95, 67, 72, 65, 78, 71, 69, // created
867                0, 7, 67, 82, 69, 65, 84, 69, 68, // function
868                0, 8, 70, 85, 78, 67, 84, 73, 79, 78, // my_ks
869                0, 5, 109, 121, 95, 107, 115, // name
870                0, 4, 110, 97, 109, 101, // empty list of parameters
871                0, 0,
872            ];
873            let expected = ServerEvent::SchemaChange(SchemaChange {
874                change_type: SchemaChangeType::Created,
875                target: SchemaChangeTarget::Function,
876                options: SchemaChangeOptions::FunctionAggregate(
877                    "my_ks".to_string(),
878                    "name".to_string(),
879                    Vec::new(),
880                ),
881            });
882            test_encode_decode(bytes, expected);
883        }
884
885        {
886            // aggregate
887            let bytes = &[
888                // schema change
889                0, 13, 83, 67, 72, 69, 77, 65, 95, 67, 72, 65, 78, 71, 69, // created
890                0, 7, 67, 82, 69, 65, 84, 69, 68, // aggregate
891                0, 9, 65, 71, 71, 82, 69, 71, 65, 84, 69, // my_ks
892                0, 5, 109, 121, 95, 107, 115, // name
893                0, 4, 110, 97, 109, 101, // empty list of parameters
894                0, 0,
895            ];
896            let expected = ServerEvent::SchemaChange(SchemaChange {
897                change_type: SchemaChangeType::Created,
898                target: SchemaChangeTarget::Aggregate,
899                options: SchemaChangeOptions::FunctionAggregate(
900                    "my_ks".to_string(),
901                    "name".to_string(),
902                    Vec::new(),
903                ),
904            });
905            test_encode_decode(bytes, expected);
906        }
907    }
908
909    #[test]
910    fn schema_change_updated() {
911        // keyspace
912        {
913            let bytes = &[
914                // schema change
915                0, 13, 83, 67, 72, 69, 77, 65, 95, 67, 72, 65, 78, 71, 69, // updated
916                0, 7, 85, 80, 68, 65, 84, 69, 68, // keyspace
917                0, 8, 75, 69, 89, 83, 80, 65, 67, 69, // my_ks
918                0, 5, 109, 121, 95, 107, 115,
919            ];
920            let expected = ServerEvent::SchemaChange(SchemaChange {
921                change_type: SchemaChangeType::Updated,
922                target: SchemaChangeTarget::Keyspace,
923                options: SchemaChangeOptions::Keyspace("my_ks".to_string()),
924            });
925            test_encode_decode(bytes, expected);
926        }
927
928        // table
929        {
930            let bytes = &[
931                // schema change
932                0, 13, 83, 67, 72, 69, 77, 65, 95, 67, 72, 65, 78, 71, 69, // updated
933                0, 7, 85, 80, 68, 65, 84, 69, 68, // table
934                0, 5, 84, 65, 66, 76, 69, // my_ks
935                0, 5, 109, 121, 95, 107, 115, // my_table
936                0, 8, 109, 121, 95, 116, 97, 98, 108, 101,
937            ];
938            let expected = ServerEvent::SchemaChange(SchemaChange {
939                change_type: SchemaChangeType::Updated,
940                target: SchemaChangeTarget::Table,
941                options: SchemaChangeOptions::TableType(
942                    "my_ks".to_string(),
943                    "my_table".to_string(),
944                ),
945            });
946            test_encode_decode(bytes, expected);
947        }
948
949        // type
950        {
951            let bytes = &[
952                // schema change
953                0, 13, 83, 67, 72, 69, 77, 65, 95, 67, 72, 65, 78, 71, 69, // updated
954                0, 7, 85, 80, 68, 65, 84, 69, 68, // type
955                0, 4, 84, 89, 80, 69, // my_ks
956                0, 5, 109, 121, 95, 107, 115, // my_table
957                0, 8, 109, 121, 95, 116, 97, 98, 108, 101,
958            ];
959            let expected = ServerEvent::SchemaChange(SchemaChange {
960                change_type: SchemaChangeType::Updated,
961                target: SchemaChangeTarget::Type,
962                options: SchemaChangeOptions::TableType(
963                    "my_ks".to_string(),
964                    "my_table".to_string(),
965                ),
966            });
967            test_encode_decode(bytes, expected);
968        }
969
970        // function
971        {
972            let bytes = &[
973                // schema change
974                0, 13, 83, 67, 72, 69, 77, 65, 95, 67, 72, 65, 78, 71, 69, // updated
975                0, 7, 85, 80, 68, 65, 84, 69, 68, // function
976                0, 8, 70, 85, 78, 67, 84, 73, 79, 78, // my_ks
977                0, 5, 109, 121, 95, 107, 115, // name
978                0, 4, 110, 97, 109, 101, // empty list of parameters
979                0, 0,
980            ];
981            let expected = ServerEvent::SchemaChange(SchemaChange {
982                change_type: SchemaChangeType::Updated,
983                target: SchemaChangeTarget::Function,
984                options: SchemaChangeOptions::FunctionAggregate(
985                    "my_ks".to_string(),
986                    "name".to_string(),
987                    Vec::new(),
988                ),
989            });
990            test_encode_decode(bytes, expected);
991        }
992
993        // aggreate
994        {
995            let bytes = &[
996                // schema change
997                0, 13, 83, 67, 72, 69, 77, 65, 95, 67, 72, 65, 78, 71, 69, // updated
998                0, 7, 85, 80, 68, 65, 84, 69, 68, // aggregate
999                0, 9, 65, 71, 71, 82, 69, 71, 65, 84, 69, // my_ks
1000                0, 5, 109, 121, 95, 107, 115, // name
1001                0, 4, 110, 97, 109, 101, // empty list of parameters
1002                0, 0,
1003            ];
1004            let expected = ServerEvent::SchemaChange(SchemaChange {
1005                change_type: SchemaChangeType::Updated,
1006                target: SchemaChangeTarget::Aggregate,
1007                options: SchemaChangeOptions::FunctionAggregate(
1008                    "my_ks".to_string(),
1009                    "name".to_string(),
1010                    Vec::new(),
1011                ),
1012            });
1013            test_encode_decode(bytes, expected);
1014        }
1015    }
1016
1017    #[test]
1018    fn schema_change_dropped() {
1019        // keyspace
1020        {
1021            let bytes = &[
1022                // schema change
1023                0, 13, 83, 67, 72, 69, 77, 65, 95, 67, 72, 65, 78, 71, 69, // dropped
1024                0, 7, 68, 82, 79, 80, 80, 69, 68, // keyspace
1025                0, 8, 75, 69, 89, 83, 80, 65, 67, 69, // my_ks
1026                0, 5, 109, 121, 95, 107, 115,
1027            ];
1028            let expected = ServerEvent::SchemaChange(SchemaChange {
1029                change_type: SchemaChangeType::Dropped,
1030                target: SchemaChangeTarget::Keyspace,
1031                options: SchemaChangeOptions::Keyspace("my_ks".to_string()),
1032            });
1033            test_encode_decode(bytes, expected);
1034        }
1035
1036        // table
1037        {
1038            let bytes = &[
1039                // schema change
1040                0, 13, 83, 67, 72, 69, 77, 65, 95, 67, 72, 65, 78, 71, 69, // dropped
1041                0, 7, 68, 82, 79, 80, 80, 69, 68, // table
1042                0, 5, 84, 65, 66, 76, 69, // my_ks
1043                0, 5, 109, 121, 95, 107, 115, // my_table
1044                0, 8, 109, 121, 95, 116, 97, 98, 108, 101,
1045            ];
1046            let expected = ServerEvent::SchemaChange(SchemaChange {
1047                change_type: SchemaChangeType::Dropped,
1048                target: SchemaChangeTarget::Table,
1049                options: SchemaChangeOptions::TableType(
1050                    "my_ks".to_string(),
1051                    "my_table".to_string(),
1052                ),
1053            });
1054            test_encode_decode(bytes, expected);
1055        }
1056
1057        // type
1058        {
1059            let bytes = &[
1060                // schema change
1061                0, 13, 83, 67, 72, 69, 77, 65, 95, 67, 72, 65, 78, 71, 69, // dropped
1062                0, 7, 68, 82, 79, 80, 80, 69, 68, // type
1063                0, 4, 84, 89, 80, 69, // my_ks
1064                0, 5, 109, 121, 95, 107, 115, // my_table
1065                0, 8, 109, 121, 95, 116, 97, 98, 108, 101,
1066            ];
1067            let expected = ServerEvent::SchemaChange(SchemaChange {
1068                change_type: SchemaChangeType::Dropped,
1069                target: SchemaChangeTarget::Type,
1070                options: SchemaChangeOptions::TableType(
1071                    "my_ks".to_string(),
1072                    "my_table".to_string(),
1073                ),
1074            });
1075            test_encode_decode(bytes, expected);
1076        }
1077
1078        // function
1079        {
1080            let bytes = &[
1081                // schema change
1082                0, 13, 83, 67, 72, 69, 77, 65, 95, 67, 72, 65, 78, 71, 69, // dropped
1083                0, 7, 68, 82, 79, 80, 80, 69, 68, // function
1084                0, 8, 70, 85, 78, 67, 84, 73, 79, 78, // my_ks
1085                0, 5, 109, 121, 95, 107, 115, // name
1086                0, 4, 110, 97, 109, 101, // empty list of parameters
1087                0, 0,
1088            ];
1089            let expected = ServerEvent::SchemaChange(SchemaChange {
1090                change_type: SchemaChangeType::Dropped,
1091                target: SchemaChangeTarget::Function,
1092                options: SchemaChangeOptions::FunctionAggregate(
1093                    "my_ks".to_string(),
1094                    "name".to_string(),
1095                    Vec::new(),
1096                ),
1097            });
1098            test_encode_decode(bytes, expected);
1099        }
1100
1101        // function
1102        {
1103            let bytes = &[
1104                // schema change
1105                0, 13, 83, 67, 72, 69, 77, 65, 95, 67, 72, 65, 78, 71, 69, // dropped
1106                0, 7, 68, 82, 79, 80, 80, 69, 68, // aggregate
1107                0, 9, 65, 71, 71, 82, 69, 71, 65, 84, 69, // my_ks
1108                0, 5, 109, 121, 95, 107, 115, // name
1109                0, 4, 110, 97, 109, 101, // empty list of parameters
1110                0, 0,
1111            ];
1112            let expected = ServerEvent::SchemaChange(SchemaChange {
1113                change_type: SchemaChangeType::Dropped,
1114                target: SchemaChangeTarget::Aggregate,
1115                options: SchemaChangeOptions::FunctionAggregate(
1116                    "my_ks".to_string(),
1117                    "name".to_string(),
1118                    Vec::new(),
1119                ),
1120            });
1121            test_encode_decode(bytes, expected);
1122        }
1123    }
1124}