cassandra_proto/frame/
events.rs

1use std::io::Cursor;
2use std::cmp::PartialEq;
3
4use crate::frame::traits::FromCursor;
5use crate::error;
6use crate::types::{CInet, CString, CStringList};
7
8// Event types
9const TOPOLOGY_CHANGE: &'static str = "TOPOLOGY_CHANGE";
10const STATUS_CHANGE: &'static str = "STATUS_CHANGE";
11const SCHEMA_CHANGE: &'static str = "SCHEMA_CHANGE";
12
13// Topologe changes
14const NEW_NODE: &'static str = "NEW_NODE";
15const REMOVED_NODE: &'static str = "REMOVED_NODE";
16
17// Status changes
18const UP: &'static str = "UP";
19const DOWN: &'static str = "DOWN";
20
21// Schema changes
22const CREATED: &'static str = "CREATED";
23const UPDATED: &'static str = "UPDATED";
24const DROPPED: &'static str = "DROPPED";
25
26// Schema change targets
27const KEYSPACE: &'static str = "KEYSPACE";
28const TABLE: &'static str = "TABLE";
29const TYPE: &'static str = "TYPE";
30const FUNCTION: &'static str = "FUNCTION";
31const AGGREGATE: &'static str = "AGGREGATE";
32
33/// Simplified `ServerEvent` that does not contain details
34/// about a concrete change. It may be useful for subscription
35/// when you need only string representation of an event.
36#[derive(Debug, PartialEq)]
37pub enum SimpleServerEvent {
38    TopologyChange,
39    StatusChange,
40    SchemaChange,
41}
42
43impl SimpleServerEvent {
44    pub fn as_string(&self) -> String {
45        match *self {
46            SimpleServerEvent::TopologyChange => String::from(TOPOLOGY_CHANGE),
47            SimpleServerEvent::StatusChange => String::from(STATUS_CHANGE),
48            SimpleServerEvent::SchemaChange => String::from(SCHEMA_CHANGE),
49        }
50    }
51}
52
53impl From<ServerEvent> for SimpleServerEvent {
54    fn from(event: ServerEvent) -> SimpleServerEvent {
55        match event {
56            ServerEvent::TopologyChange(_) => SimpleServerEvent::TopologyChange,
57            ServerEvent::StatusChange(_) => SimpleServerEvent::StatusChange,
58            ServerEvent::SchemaChange(_) => SimpleServerEvent::SchemaChange,
59        }
60    }
61}
62
63impl<'a> From<&'a ServerEvent> for SimpleServerEvent {
64    fn from(event: &'a ServerEvent) -> SimpleServerEvent {
65        match *event {
66            ServerEvent::TopologyChange(_) => SimpleServerEvent::TopologyChange,
67            ServerEvent::StatusChange(_) => SimpleServerEvent::StatusChange,
68            ServerEvent::SchemaChange(_) => SimpleServerEvent::SchemaChange,
69        }
70    }
71}
72
73impl PartialEq<ServerEvent> for SimpleServerEvent {
74    fn eq(&self, full_event: &ServerEvent) -> bool {
75        *self == SimpleServerEvent::from(full_event)
76    }
77}
78
79/// Full server event that contains all details about a concreate change.
80#[derive(Debug)]
81pub enum ServerEvent {
82    /// Events related to change in the cluster topology
83    TopologyChange(TopologyChange),
84    /// Events related to change of node status.
85    StatusChange(StatusChange),
86    /// Events related to schema change.
87    SchemaChange(SchemaChange),
88}
89
90impl PartialEq<SimpleServerEvent> for ServerEvent {
91    fn eq(&self, event: &SimpleServerEvent) -> bool {
92        SimpleServerEvent::from(self) == *event
93    }
94}
95
96impl FromCursor for ServerEvent {
97    fn from_cursor(mut cursor: &mut Cursor<&[u8]>) -> error::Result<ServerEvent> {
98        let event_type = CString::from_cursor(&mut cursor)?;
99        match event_type.as_str() {
100            TOPOLOGY_CHANGE => {
101                Ok(ServerEvent::TopologyChange(TopologyChange::from_cursor(&mut cursor)?))
102            }
103            STATUS_CHANGE => Ok(ServerEvent::StatusChange(StatusChange::from_cursor(&mut cursor)?)),
104            SCHEMA_CHANGE => Ok(ServerEvent::SchemaChange(SchemaChange::from_cursor(&mut cursor)?)),
105            _ => Err("Unexpected server event".into()),
106        }
107    }
108}
109
110/// Events related to change in the cluster topology
111#[derive(Debug)]
112pub struct TopologyChange {
113    pub change_type: TopologyChangeType,
114    pub addr: CInet,
115}
116
117impl FromCursor for TopologyChange {
118    fn from_cursor(mut cursor: &mut Cursor<&[u8]>) -> error::Result<TopologyChange> {
119        let change_type = TopologyChangeType::from_cursor(&mut cursor)?;
120        let addr = CInet::from_cursor(&mut cursor)?;
121
122        Ok(TopologyChange { change_type: change_type,
123                            addr: addr, })
124    }
125}
126
127#[derive(Debug, PartialEq)]
128pub enum TopologyChangeType {
129    NewNode,
130    RemovedNode,
131}
132
133impl FromCursor for TopologyChangeType {
134    fn from_cursor(mut cursor: &mut Cursor<&[u8]>) -> error::Result<TopologyChangeType> {
135        CString::from_cursor(&mut cursor).and_then(|tc| match tc.as_str() {
136            NEW_NODE => Ok(TopologyChangeType::NewNode),
137            REMOVED_NODE => Ok(TopologyChangeType::RemovedNode),
138            _ => Err("Unexpected topology change type received from Cluster".into()),
139        })
140    }
141}
142
143/// Events related to change of node status.
144#[derive(Debug)]
145pub struct StatusChange {
146    pub change_type: StatusChangeType,
147    pub addr: CInet,
148}
149
150impl FromCursor for StatusChange {
151    fn from_cursor(mut cursor: &mut Cursor<&[u8]>) -> error::Result<StatusChange> {
152        let change_type = StatusChangeType::from_cursor(&mut cursor)?;
153        let addr = CInet::from_cursor(&mut cursor)?;
154
155        Ok(StatusChange { change_type: change_type,
156                          addr: addr, })
157    }
158}
159
160#[derive(Debug, PartialEq)]
161pub enum StatusChangeType {
162    Up,
163    Down,
164}
165
166impl FromCursor for StatusChangeType {
167    fn from_cursor(mut cursor: &mut Cursor<&[u8]>) -> error::Result<StatusChangeType> {
168        CString::from_cursor(&mut cursor).and_then(|sct| match sct.as_str() {
169            UP => Ok(StatusChangeType::Up),
170            DOWN => Ok(StatusChangeType::Down),
171            _ => Err("Unexpected status change type".into()),
172        })
173    }
174}
175
176/// Events related to schema change.
177#[derive(Debug, PartialEq)]
178pub struct SchemaChange {
179    pub change_type: ChangeType,
180    pub target: Target,
181    pub options: ChangeSchemeOptions,
182}
183
184impl FromCursor for SchemaChange {
185    fn from_cursor(mut cursor: &mut Cursor<&[u8]>) -> error::Result<SchemaChange> {
186        let change_type = ChangeType::from_cursor(&mut cursor)?;
187        let target = Target::from_cursor(&mut cursor)?;
188        let options = ChangeSchemeOptions::from_cursor_and_target(&mut cursor, &target)?;
189
190        Ok(SchemaChange { change_type: change_type,
191                          target: target,
192                          options: options, })
193    }
194}
195
196/// Represents type of changes.
197// TODO: rename to SchemaChangeType
198#[derive(Debug, PartialEq)]
199pub enum ChangeType {
200    Created,
201    Updated,
202    Dropped,
203}
204
205impl FromCursor for ChangeType {
206    fn from_cursor(mut cursor: &mut Cursor<&[u8]>) -> error::Result<ChangeType> {
207        CString::from_cursor(&mut cursor).and_then(|ct| match ct.as_str() {
208            CREATED => Ok(ChangeType::Created),
209            UPDATED => Ok(ChangeType::Updated),
210            DROPPED => Ok(ChangeType::Dropped),
211            _ => Err("Unexpected schema change type".into()),
212        })
213    }
214}
215
216/// Refers to a target of changes were made.
217// TODO: rename to SchemaChangeTarget
218#[derive(Debug, PartialEq)]
219pub enum Target {
220    Keyspace,
221    Table,
222    Type,
223    Function,
224    Aggregate,
225}
226
227impl FromCursor for Target {
228    fn from_cursor(mut cursor: &mut Cursor<&[u8]>) -> error::Result<Target> {
229        CString::from_cursor(&mut cursor).and_then(|t| match t.as_str() {
230            KEYSPACE => Ok(Target::Keyspace),
231            TABLE => Ok(Target::Table),
232            TYPE => Ok(Target::Type),
233            FUNCTION => Ok(Target::Function),
234            AGGREGATE => Ok(Target::Aggregate),
235            _ => Err("Unexpected schema change target".into()),
236        })
237    }
238}
239
240/// Option that contains an information about changes were made.
241#[derive(Debug, PartialEq)]
242pub enum ChangeSchemeOptions {
243    /// Changes related to keyspaces. Contains keyspace name.
244    Keyspace(String),
245    /// Changes related to tables. Contains keyspace and table names.
246    TableType((String, String)),
247    /// Changes related to functions and aggregations. Contains:
248    /// * keyspace containing the user defined function / aggregate
249    /// * the function/aggregate name
250    /// * list of strings, one string for each argument type (as CQL type)
251    FunctionAggregate((String, String, Vec<String>)),
252}
253
254impl ChangeSchemeOptions {
255    fn from_cursor_and_target(mut cursor: &mut Cursor<&[u8]>,
256                              target: &Target)
257                              -> error::Result<ChangeSchemeOptions> {
258        Ok(match *target {
259            Target::Keyspace => ChangeSchemeOptions::from_cursor_keyspace(&mut cursor)?,
260            Target::Table | Target::Type => {
261                ChangeSchemeOptions::from_cursor_table_type(&mut cursor)?
262            }
263            Target::Function | Target::Aggregate => {
264                ChangeSchemeOptions::from_cursor_function_aggregate(&mut cursor)?
265            }
266        })
267    }
268
269    fn from_cursor_keyspace(mut cursor: &mut Cursor<&[u8]>) -> error::Result<ChangeSchemeOptions> {
270        Ok(ChangeSchemeOptions::Keyspace(CString::from_cursor(&mut cursor)?.into_plain()))
271    }
272
273    fn from_cursor_table_type(mut cursor: &mut Cursor<&[u8]>)
274                              -> error::Result<ChangeSchemeOptions> {
275        let keyspace = CString::from_cursor(&mut cursor)?.into_plain();
276        let name = CString::from_cursor(&mut cursor)?.into_plain();
277        Ok(ChangeSchemeOptions::TableType((keyspace, name)))
278    }
279
280    fn from_cursor_function_aggregate(mut cursor: &mut Cursor<&[u8]>)
281                                      -> error::Result<ChangeSchemeOptions> {
282        let keyspace = CString::from_cursor(&mut cursor)?.into_plain();
283        let name = CString::from_cursor(&mut cursor)?.into_plain();
284        let types = CStringList::from_cursor(&mut cursor)?.into_plain();
285        Ok(ChangeSchemeOptions::FunctionAggregate((keyspace,
286                                                  name,
287                                                  types)))
288    }
289}
290
291#[cfg(test)]
292mod simple_server_event_test {
293    use super::*;
294
295    #[test]
296    fn as_string() {
297        assert_eq!(SimpleServerEvent::TopologyChange.as_string(),
298                   "TOPOLOGY_CHANGE".to_string());
299        assert_eq!(SimpleServerEvent::StatusChange.as_string(),
300                   "STATUS_CHANGE".to_string());
301        assert_eq!(SimpleServerEvent::SchemaChange.as_string(),
302                   "SCHEMA_CHANGE".to_string());
303    }
304}
305
306#[cfg(test)]
307mod topology_change_type_test {
308    use super::*;
309    use std::io::Cursor;
310    use crate::frame::traits::FromCursor;
311
312    #[test]
313    fn from_cursor() {
314        let a = &[0, 8, 78, 69, 87, 95, 78, 79, 68, 69];
315        let mut new_node: Cursor<&[u8]> = Cursor::new(a);
316        assert_eq!(TopologyChangeType::from_cursor(&mut new_node).unwrap(),
317                   TopologyChangeType::NewNode);
318
319        let b = &[0, 12, 82, 69, 77, 79, 86, 69, 68, 95, 78, 79, 68, 69];
320        let mut removed_node: Cursor<&[u8]> = Cursor::new(b);
321        assert_eq!(TopologyChangeType::from_cursor(&mut removed_node).unwrap(),
322                   TopologyChangeType::RemovedNode);
323    }
324
325    #[test]
326    #[should_panic]
327    fn from_cursor_wrong() {
328        let a = &[0, 1, 78];
329        let mut wrong: Cursor<&[u8]> = Cursor::new(a);
330        let _ = TopologyChangeType::from_cursor(&mut wrong).unwrap();
331    }
332}
333
334#[cfg(test)]
335mod status_change_type_test {
336    use super::*;
337    use std::io::Cursor;
338    use crate::frame::traits::FromCursor;
339
340    #[test]
341    fn from_cursor() {
342        let a = &[0, 2, 85, 80];
343        let mut up: Cursor<&[u8]> = Cursor::new(a);
344        assert_eq!(StatusChangeType::from_cursor(&mut up).unwrap(),
345                   StatusChangeType::Up);
346
347        let b = &[0, 4, 68, 79, 87, 78];
348        let mut down: Cursor<&[u8]> = Cursor::new(b);
349        assert_eq!(StatusChangeType::from_cursor(&mut down).unwrap(),
350                   StatusChangeType::Down);
351    }
352
353    #[test]
354    #[should_panic]
355    fn from_cursor_wrong() {
356        let a = &[0, 1, 78];
357        let mut wrong: Cursor<&[u8]> = Cursor::new(a);
358        let _ = StatusChangeType::from_cursor(&mut wrong).unwrap();
359    }
360}
361
362#[cfg(test)]
363mod schema_change_type_test {
364    use super::*;
365    use std::io::Cursor;
366    use crate::frame::traits::FromCursor;
367
368    #[test]
369    fn from_cursor() {
370        let a = &[0, 7, 67, 82, 69, 65, 84, 69, 68];
371        let mut created: Cursor<&[u8]> = Cursor::new(a);
372        assert_eq!(ChangeType::from_cursor(&mut created).unwrap(),
373                   ChangeType::Created);
374
375        let b = &[0, 7, 85, 80, 68, 65, 84, 69, 68];
376        let mut updated: Cursor<&[u8]> = Cursor::new(b);
377        assert_eq!(ChangeType::from_cursor(&mut updated).unwrap(),
378                   ChangeType::Updated);
379
380        let c = &[0, 7, 68, 82, 79, 80, 80, 69, 68];
381        let mut dropped: Cursor<&[u8]> = Cursor::new(c);
382        assert_eq!(ChangeType::from_cursor(&mut dropped).unwrap(),
383                   ChangeType::Dropped);
384    }
385
386    #[test]
387    #[should_panic]
388    fn from_cursor_wrong() {
389        let a = &[0, 1, 78];
390        let mut wrong: Cursor<&[u8]> = Cursor::new(a);
391        let _ = ChangeType::from_cursor(&mut wrong).unwrap();
392    }
393}
394
395#[cfg(test)]
396mod schema_change_target_test {
397    use super::*;
398    use std::io::Cursor;
399    use crate::frame::traits::FromCursor;
400
401    #[test]
402    fn from_cursor() {
403        let a = &[0, 8, 75, 69, 89, 83, 80, 65, 67, 69];
404        let mut keyspace: Cursor<&[u8]> = Cursor::new(a);
405        assert_eq!(Target::from_cursor(&mut keyspace).unwrap(),
406                   Target::Keyspace);
407
408        let b = &[0, 5, 84, 65, 66, 76, 69];
409        let mut table: Cursor<&[u8]> = Cursor::new(b);
410        assert_eq!(Target::from_cursor(&mut table).unwrap(), Target::Table);
411
412        let c = &[0, 4, 84, 89, 80, 69];
413        let mut _type: Cursor<&[u8]> = Cursor::new(c);
414        assert_eq!(Target::from_cursor(&mut _type).unwrap(), Target::Type);
415
416        let d = &[0, 8, 70, 85, 78, 67, 84, 73, 79, 78];
417        let mut function: Cursor<&[u8]> = Cursor::new(d);
418        assert_eq!(Target::from_cursor(&mut function).unwrap(),
419                   Target::Function);
420
421        let e = &[0, 9, 65, 71, 71, 82, 69, 71, 65, 84, 69];
422        let mut aggregate: Cursor<&[u8]> = Cursor::new(e);
423        assert_eq!(Target::from_cursor(&mut aggregate).unwrap(),
424                   Target::Aggregate);
425    }
426
427    #[test]
428    #[should_panic]
429    fn from_cursor_wrong() {
430        let a = &[0, 1, 78];
431        let mut wrong: Cursor<&[u8]> = Cursor::new(a);
432        let _ = Target::from_cursor(&mut wrong).unwrap();
433    }
434}
435
436#[cfg(test)]
437mod server_event {
438    use super::*;
439    use std::io::Cursor;
440    use crate::frame::traits::FromCursor;
441
442    #[test]
443    fn topology_change_new_node() {
444        let bytes = &[// topology change
445                      0,
446                      15,
447                      84,
448                      79,
449                      80,
450                      79,
451                      76,
452                      79,
453                      71,
454                      89,
455                      95,
456                      67,
457                      72,
458                      65,
459                      78,
460                      71,
461                      69,
462                      // new node
463                      0,
464                      8,
465                      78,
466                      69,
467                      87,
468                      95,
469                      78,
470                      79,
471                      68,
472                      69,
473                      // 127.0.0.1:1
474                      0,
475                      4,
476                      127,
477                      0,
478                      0,
479                      1,
480                      0,
481                      0,
482                      0,
483                      1];
484        let mut c: Cursor<&[u8]> = Cursor::new(bytes);
485        let event = ServerEvent::from_cursor(&mut c).unwrap();
486        match event {
487            ServerEvent::TopologyChange(ref tc) => {
488                assert_eq!(tc.change_type, TopologyChangeType::NewNode);
489                assert_eq!(format!("{:?}", tc.addr.addr), "V4(127.0.0.1:1)");
490            }
491            _ => panic!("should be topology change event"),
492        }
493    }
494
495    #[test]
496    fn topology_change_removed_node() {
497        let bytes = &[// topology change
498                      0,
499                      15,
500                      84,
501                      79,
502                      80,
503                      79,
504                      76,
505                      79,
506                      71,
507                      89,
508                      95,
509                      67,
510                      72,
511                      65,
512                      78,
513                      71,
514                      69,
515                      // removed node
516                      0,
517                      12,
518                      82,
519                      69,
520                      77,
521                      79,
522                      86,
523                      69,
524                      68,
525                      95,
526                      78,
527                      79,
528                      68,
529                      69,
530                      // 127.0.0.1:1
531                      0,
532                      4,
533                      127,
534                      0,
535                      0,
536                      1,
537                      0,
538                      0,
539                      0,
540                      1];
541        let mut c: Cursor<&[u8]> = Cursor::new(bytes);
542        let event = ServerEvent::from_cursor(&mut c).unwrap();
543        match event {
544            ServerEvent::TopologyChange(ref tc) => {
545                assert_eq!(tc.change_type, TopologyChangeType::RemovedNode);
546                assert_eq!(format!("{:?}", tc.addr.addr), "V4(127.0.0.1:1)");
547            }
548            _ => panic!("should be topology change event"),
549        }
550    }
551
552    #[test]
553    fn status_change_up() {
554        let bytes = &[// status change
555                      0,
556                      13,
557                      83,
558                      84,
559                      65,
560                      84,
561                      85,
562                      83,
563                      95,
564                      67,
565                      72,
566                      65,
567                      78,
568                      71,
569                      69,
570                      // up
571                      0,
572                      2,
573                      85,
574                      80,
575                      // 127.0.0.1:1
576                      0,
577                      4,
578                      127,
579                      0,
580                      0,
581                      1,
582                      0,
583                      0,
584                      0,
585                      1];
586        let mut c: Cursor<&[u8]> = Cursor::new(bytes);
587        let event = ServerEvent::from_cursor(&mut c).unwrap();
588        match event {
589            ServerEvent::StatusChange(ref tc) => {
590                assert_eq!(tc.change_type, StatusChangeType::Up);
591                assert_eq!(format!("{:?}", tc.addr.addr), "V4(127.0.0.1:1)");
592            }
593            _ => panic!("should be status change up"),
594        }
595    }
596
597    #[test]
598    fn status_change_down() {
599        let bytes = &[// status change
600                      0,
601                      13,
602                      83,
603                      84,
604                      65,
605                      84,
606                      85,
607                      83,
608                      95,
609                      67,
610                      72,
611                      65,
612                      78,
613                      71,
614                      69,
615                      // down
616                      0,
617                      4,
618                      68,
619                      79,
620                      87,
621                      78,
622                      // 127.0.0.1:1
623                      0,
624                      4,
625                      127,
626                      0,
627                      0,
628                      1,
629                      0,
630                      0,
631                      0,
632                      1];
633        let mut c: Cursor<&[u8]> = Cursor::new(bytes);
634        let event = ServerEvent::from_cursor(&mut c).unwrap();
635        match event {
636            ServerEvent::StatusChange(ref tc) => {
637                assert_eq!(tc.change_type, StatusChangeType::Down);
638                assert_eq!(format!("{:?}", tc.addr.addr), "V4(127.0.0.1:1)");
639            }
640            _ => panic!("should be status change down"),
641        }
642    }
643
644    #[test]
645    fn schema_change_created() {
646        // keyspace
647        let keyspace = &[// schema change
648                         0,
649                         13,
650                         83,
651                         67,
652                         72,
653                         69,
654                         77,
655                         65,
656                         95,
657                         67,
658                         72,
659                         65,
660                         78,
661                         71,
662                         69,
663                         // created
664                         0,
665                         7,
666                         67,
667                         82,
668                         69,
669                         65,
670                         84,
671                         69,
672                         68,
673                         // keyspace
674                         0,
675                         8,
676                         75,
677                         69,
678                         89,
679                         83,
680                         80,
681                         65,
682                         67,
683                         69,
684                         // my_ks
685                         0,
686                         5,
687                         109,
688                         121,
689                         95,
690                         107,
691                         115];
692        let mut ks: Cursor<&[u8]> = Cursor::new(keyspace);
693        let ks_event = ServerEvent::from_cursor(&mut ks).unwrap();
694        match ks_event {
695            ServerEvent::SchemaChange(ref _c) => {
696                assert_eq!(_c.change_type, ChangeType::Created);
697                assert_eq!(_c.target, Target::Keyspace);
698                match _c.options {
699                    ChangeSchemeOptions::Keyspace(ref ks) => assert_eq!(ks.as_str(), "my_ks"),
700                    _ => panic!("should be keyspace"),
701                }
702            }
703            _ => panic!("should be schema change"),
704        }
705        // table
706        let table = &[// schema change
707                      0,
708                      13,
709                      83,
710                      67,
711                      72,
712                      69,
713                      77,
714                      65,
715                      95,
716                      67,
717                      72,
718                      65,
719                      78,
720                      71,
721                      69,
722                      // created
723                      0,
724                      7,
725                      67,
726                      82,
727                      69,
728                      65,
729                      84,
730                      69,
731                      68,
732                      // table
733                      0,
734                      5,
735                      84,
736                      65,
737                      66,
738                      76,
739                      69,
740                      // my_ks
741                      0,
742                      5,
743                      109,
744                      121,
745                      95,
746                      107,
747                      115,
748                      // my_table
749                      0,
750                      8,
751                      109,
752                      121,
753                      95,
754                      116,
755                      97,
756                      98,
757                      108,
758                      101];
759        let mut tb: Cursor<&[u8]> = Cursor::new(table);
760        let tb_event = ServerEvent::from_cursor(&mut tb).unwrap();
761        match tb_event {
762            ServerEvent::SchemaChange(ref _c) => {
763                assert_eq!(_c.change_type, ChangeType::Created);
764                assert_eq!(_c.target, Target::Table);
765                match _c.options {
766                    ChangeSchemeOptions::TableType(ref tt) => {
767                        assert_eq!(tt, &("my_ks".to_string(), "my_table".to_string()))
768                    }
769                    _ => panic!("should be table"),
770                }
771            }
772            _ => panic!("should be schema change"),
773        }
774        // type
775        let _type = &[// schema change
776                      0,
777                      13,
778                      83,
779                      67,
780                      72,
781                      69,
782                      77,
783                      65,
784                      95,
785                      67,
786                      72,
787                      65,
788                      78,
789                      71,
790                      69,
791                      // created
792                      0,
793                      7,
794                      67,
795                      82,
796                      69,
797                      65,
798                      84,
799                      69,
800                      68,
801                      // type
802                      0,
803                      4,
804                      84,
805                      89,
806                      80,
807                      69,
808                      // my_ks
809                      0,
810                      5,
811                      109,
812                      121,
813                      95,
814                      107,
815                      115,
816                      // my_table
817                      0,
818                      8,
819                      109,
820                      121,
821                      95,
822                      116,
823                      97,
824                      98,
825                      108,
826                      101];
827        let mut tp: Cursor<&[u8]> = Cursor::new(_type);
828        let tp_event = ServerEvent::from_cursor(&mut tp).unwrap();
829        match tp_event {
830            ServerEvent::SchemaChange(ref _c) => {
831                assert_eq!(_c.change_type, ChangeType::Created);
832                assert_eq!(_c.target, Target::Type);
833                match _c.options {
834                    ChangeSchemeOptions::TableType(ref tt) => {
835                        assert_eq!(tt, &("my_ks".to_string(), "my_table".to_string()))
836                    }
837                    _ => panic!("should be type"),
838                }
839            }
840            _ => panic!("should be schema change"),
841        }
842        // function
843        let function = &[// schema change
844                         0,
845                         13,
846                         83,
847                         67,
848                         72,
849                         69,
850                         77,
851                         65,
852                         95,
853                         67,
854                         72,
855                         65,
856                         78,
857                         71,
858                         69,
859                         // created
860                         0,
861                         7,
862                         67,
863                         82,
864                         69,
865                         65,
866                         84,
867                         69,
868                         68,
869                         // function
870                         0,
871                         8,
872                         70,
873                         85,
874                         78,
875                         67,
876                         84,
877                         73,
878                         79,
879                         78,
880                         // my_ks
881                         0,
882                         5,
883                         109,
884                         121,
885                         95,
886                         107,
887                         115,
888                         // name
889                         0,
890                         4,
891                         110,
892                         97,
893                         109,
894                         101,
895                         // empty list of parameters
896                         0,
897                         0];
898        let mut fnct: Cursor<&[u8]> = Cursor::new(function);
899        let fnct_event = ServerEvent::from_cursor(&mut fnct).unwrap();
900        match fnct_event {
901            ServerEvent::SchemaChange(ref _c) => {
902                assert_eq!(_c.change_type, ChangeType::Created);
903                assert_eq!(_c.target, Target::Function);
904                match _c.options {
905                    ChangeSchemeOptions::FunctionAggregate(ref tt) => {
906                        assert_eq!(tt, &("my_ks".to_string(), "name".to_string(), vec![]))
907                    }
908                    _ => panic!("should be function"),
909                }
910            }
911            _ => panic!("should be schema change"),
912        }
913        // function
914        let aggregate = &[// schema change
915                          0,
916                          13,
917                          83,
918                          67,
919                          72,
920                          69,
921                          77,
922                          65,
923                          95,
924                          67,
925                          72,
926                          65,
927                          78,
928                          71,
929                          69,
930                          // created
931                          0,
932                          7,
933                          67,
934                          82,
935                          69,
936                          65,
937                          84,
938                          69,
939                          68,
940                          // aggregate
941                          0,
942                          9,
943                          65,
944                          71,
945                          71,
946                          82,
947                          69,
948                          71,
949                          65,
950                          84,
951                          69,
952                          // my_ks
953                          0,
954                          5,
955                          109,
956                          121,
957                          95,
958                          107,
959                          115,
960                          // name
961                          0,
962                          4,
963                          110,
964                          97,
965                          109,
966                          101,
967                          // empty list of parameters
968                          0,
969                          0];
970        let mut aggr: Cursor<&[u8]> = Cursor::new(aggregate);
971        let aggr_event = ServerEvent::from_cursor(&mut aggr).unwrap();
972        match aggr_event {
973            ServerEvent::SchemaChange(ref _c) => {
974                assert_eq!(_c.change_type, ChangeType::Created);
975                assert_eq!(_c.target, Target::Aggregate);
976                match _c.options {
977                    ChangeSchemeOptions::FunctionAggregate(ref tt) => {
978                        assert_eq!(tt, &("my_ks".to_string(), "name".to_string(), vec![]))
979                    }
980                    _ => panic!("should be aggregate"),
981                }
982            }
983            _ => panic!("should be schema change"),
984        }
985    }
986
987    #[test]
988    fn schema_change_updated() {
989        // keyspace
990        let keyspace = &[// schema change
991                         0,
992                         13,
993                         83,
994                         67,
995                         72,
996                         69,
997                         77,
998                         65,
999                         95,
1000                         67,
1001                         72,
1002                         65,
1003                         78,
1004                         71,
1005                         69,
1006                         // updated
1007                         0,
1008                         7,
1009                         85,
1010                         80,
1011                         68,
1012                         65,
1013                         84,
1014                         69,
1015                         68,
1016                         // keyspace
1017                         0,
1018                         8,
1019                         75,
1020                         69,
1021                         89,
1022                         83,
1023                         80,
1024                         65,
1025                         67,
1026                         69,
1027                         // my_ks
1028                         0,
1029                         5,
1030                         109,
1031                         121,
1032                         95,
1033                         107,
1034                         115];
1035        let mut ks: Cursor<&[u8]> = Cursor::new(keyspace);
1036        let ks_event = ServerEvent::from_cursor(&mut ks).unwrap();
1037        match ks_event {
1038            ServerEvent::SchemaChange(ref _c) => {
1039                assert_eq!(_c.change_type, ChangeType::Updated);
1040                assert_eq!(_c.target, Target::Keyspace);
1041                match _c.options {
1042                    ChangeSchemeOptions::Keyspace(ref ks) => assert_eq!(ks.as_str(), "my_ks"),
1043                    _ => panic!("should be keyspace"),
1044                }
1045            }
1046            _ => panic!("should be schema change"),
1047        }
1048        // table
1049        let table = &[// schema change
1050                      0,
1051                      13,
1052                      83,
1053                      67,
1054                      72,
1055                      69,
1056                      77,
1057                      65,
1058                      95,
1059                      67,
1060                      72,
1061                      65,
1062                      78,
1063                      71,
1064                      69,
1065                      // updated
1066                      0,
1067                      7,
1068                      85,
1069                      80,
1070                      68,
1071                      65,
1072                      84,
1073                      69,
1074                      68,
1075                      // table
1076                      0,
1077                      5,
1078                      84,
1079                      65,
1080                      66,
1081                      76,
1082                      69,
1083                      // my_ks
1084                      0,
1085                      5,
1086                      109,
1087                      121,
1088                      95,
1089                      107,
1090                      115,
1091                      // my_table
1092                      0,
1093                      8,
1094                      109,
1095                      121,
1096                      95,
1097                      116,
1098                      97,
1099                      98,
1100                      108,
1101                      101];
1102        let mut tb: Cursor<&[u8]> = Cursor::new(table);
1103        let tb_event = ServerEvent::from_cursor(&mut tb).unwrap();
1104        match tb_event {
1105            ServerEvent::SchemaChange(ref _c) => {
1106                assert_eq!(_c.change_type, ChangeType::Updated);
1107                assert_eq!(_c.target, Target::Table);
1108                match _c.options {
1109                    ChangeSchemeOptions::TableType(ref tt) => {
1110                        assert_eq!(tt, &("my_ks".to_string(), "my_table".to_string()))
1111                    }
1112                    _ => panic!("should be table"),
1113                }
1114            }
1115            _ => panic!("should be schema change"),
1116        }
1117        // type
1118        let _type = &[// schema change
1119                      0,
1120                      13,
1121                      83,
1122                      67,
1123                      72,
1124                      69,
1125                      77,
1126                      65,
1127                      95,
1128                      67,
1129                      72,
1130                      65,
1131                      78,
1132                      71,
1133                      69,
1134                      // updated
1135                      0,
1136                      7,
1137                      85,
1138                      80,
1139                      68,
1140                      65,
1141                      84,
1142                      69,
1143                      68,
1144                      // type
1145                      0,
1146                      4,
1147                      84,
1148                      89,
1149                      80,
1150                      69,
1151                      // my_ks
1152                      0,
1153                      5,
1154                      109,
1155                      121,
1156                      95,
1157                      107,
1158                      115,
1159                      // my_table
1160                      0,
1161                      8,
1162                      109,
1163                      121,
1164                      95,
1165                      116,
1166                      97,
1167                      98,
1168                      108,
1169                      101];
1170        let mut tp: Cursor<&[u8]> = Cursor::new(_type);
1171        let tp_event = ServerEvent::from_cursor(&mut tp).unwrap();
1172        match tp_event {
1173            ServerEvent::SchemaChange(ref _c) => {
1174                assert_eq!(_c.change_type, ChangeType::Updated);
1175                assert_eq!(_c.target, Target::Type);
1176                match _c.options {
1177                    ChangeSchemeOptions::TableType(ref tt) => {
1178                        assert_eq!(tt, &("my_ks".to_string(), "my_table".to_string()))
1179                    }
1180                    _ => panic!("should be type"),
1181                }
1182            }
1183            _ => panic!("should be schema change"),
1184        }
1185        // function
1186        let function = &[// schema change
1187                         0,
1188                         13,
1189                         83,
1190                         67,
1191                         72,
1192                         69,
1193                         77,
1194                         65,
1195                         95,
1196                         67,
1197                         72,
1198                         65,
1199                         78,
1200                         71,
1201                         69,
1202                         // updated
1203                         0,
1204                         7,
1205                         85,
1206                         80,
1207                         68,
1208                         65,
1209                         84,
1210                         69,
1211                         68,
1212                         // function
1213                         0,
1214                         8,
1215                         70,
1216                         85,
1217                         78,
1218                         67,
1219                         84,
1220                         73,
1221                         79,
1222                         78,
1223                         // my_ks
1224                         0,
1225                         5,
1226                         109,
1227                         121,
1228                         95,
1229                         107,
1230                         115,
1231                         // name
1232                         0,
1233                         4,
1234                         110,
1235                         97,
1236                         109,
1237                         101,
1238                         // empty list of parameters
1239                         0,
1240                         0];
1241        let mut fnct: Cursor<&[u8]> = Cursor::new(function);
1242        let fnct_event = ServerEvent::from_cursor(&mut fnct).unwrap();
1243        match fnct_event {
1244            ServerEvent::SchemaChange(ref _c) => {
1245                assert_eq!(_c.change_type, ChangeType::Updated);
1246                assert_eq!(_c.target, Target::Function);
1247                match _c.options {
1248                    ChangeSchemeOptions::FunctionAggregate(ref tt) => {
1249                        assert_eq!(tt, &("my_ks".to_string(), "name".to_string(), vec![]))
1250                    }
1251                    _ => panic!("should be function"),
1252                }
1253            }
1254            _ => panic!("should be schema change"),
1255        }
1256        // function
1257        let aggregate = &[// schema change
1258                          0,
1259                          13,
1260                          83,
1261                          67,
1262                          72,
1263                          69,
1264                          77,
1265                          65,
1266                          95,
1267                          67,
1268                          72,
1269                          65,
1270                          78,
1271                          71,
1272                          69,
1273                          // updated
1274                          0,
1275                          7,
1276                          85,
1277                          80,
1278                          68,
1279                          65,
1280                          84,
1281                          69,
1282                          68,
1283                          // aggregate
1284                          0,
1285                          9,
1286                          65,
1287                          71,
1288                          71,
1289                          82,
1290                          69,
1291                          71,
1292                          65,
1293                          84,
1294                          69,
1295                          // my_ks
1296                          0,
1297                          5,
1298                          109,
1299                          121,
1300                          95,
1301                          107,
1302                          115,
1303                          // name
1304                          0,
1305                          4,
1306                          110,
1307                          97,
1308                          109,
1309                          101,
1310                          // empty list of parameters
1311                          0,
1312                          0];
1313        let mut aggr: Cursor<&[u8]> = Cursor::new(aggregate);
1314        let aggr_event = ServerEvent::from_cursor(&mut aggr).unwrap();
1315        match aggr_event {
1316            ServerEvent::SchemaChange(ref _c) => {
1317                assert_eq!(_c.change_type, ChangeType::Updated);
1318                assert_eq!(_c.target, Target::Aggregate);
1319                match _c.options {
1320                    ChangeSchemeOptions::FunctionAggregate(ref tt) => {
1321                        assert_eq!(tt, &("my_ks".to_string(), "name".to_string(), vec![]))
1322                    }
1323                    _ => panic!("should be aggregate"),
1324                }
1325            }
1326            _ => panic!("should be schema change"),
1327        }
1328    }
1329
1330    #[test]
1331    fn schema_change_dropped() {
1332        // keyspace
1333        let keyspace = &[// schema change
1334                         0,
1335                         13,
1336                         83,
1337                         67,
1338                         72,
1339                         69,
1340                         77,
1341                         65,
1342                         95,
1343                         67,
1344                         72,
1345                         65,
1346                         78,
1347                         71,
1348                         69,
1349                         // dropped
1350                         0,
1351                         7,
1352                         68,
1353                         82,
1354                         79,
1355                         80,
1356                         80,
1357                         69,
1358                         68,
1359                         // keyspace
1360                         0,
1361                         8,
1362                         75,
1363                         69,
1364                         89,
1365                         83,
1366                         80,
1367                         65,
1368                         67,
1369                         69,
1370                         // my_ks
1371                         0,
1372                         5,
1373                         109,
1374                         121,
1375                         95,
1376                         107,
1377                         115];
1378        let mut ks: Cursor<&[u8]> = Cursor::new(keyspace);
1379        let ks_event = ServerEvent::from_cursor(&mut ks).unwrap();
1380        match ks_event {
1381            ServerEvent::SchemaChange(ref _c) => {
1382                assert_eq!(_c.change_type, ChangeType::Dropped);
1383                assert_eq!(_c.target, Target::Keyspace);
1384                match _c.options {
1385                    ChangeSchemeOptions::Keyspace(ref ks) => assert_eq!(ks.as_str(), "my_ks"),
1386                    _ => panic!("should be keyspace"),
1387                }
1388            }
1389            _ => panic!("should be schema change"),
1390        }
1391        // table
1392        let table = &[// schema change
1393                      0,
1394                      13,
1395                      83,
1396                      67,
1397                      72,
1398                      69,
1399                      77,
1400                      65,
1401                      95,
1402                      67,
1403                      72,
1404                      65,
1405                      78,
1406                      71,
1407                      69,
1408                      // dropped
1409                      0,
1410                      7,
1411                      68,
1412                      82,
1413                      79,
1414                      80,
1415                      80,
1416                      69,
1417                      68,
1418                      // table
1419                      0,
1420                      5,
1421                      84,
1422                      65,
1423                      66,
1424                      76,
1425                      69,
1426                      // my_ks
1427                      0,
1428                      5,
1429                      109,
1430                      121,
1431                      95,
1432                      107,
1433                      115,
1434                      // my_table
1435                      0,
1436                      8,
1437                      109,
1438                      121,
1439                      95,
1440                      116,
1441                      97,
1442                      98,
1443                      108,
1444                      101];
1445        let mut tb: Cursor<&[u8]> = Cursor::new(table);
1446        let tb_event = ServerEvent::from_cursor(&mut tb).unwrap();
1447        match tb_event {
1448            ServerEvent::SchemaChange(ref _c) => {
1449                assert_eq!(_c.change_type, ChangeType::Dropped);
1450                assert_eq!(_c.target, Target::Table);
1451                match _c.options {
1452                    ChangeSchemeOptions::TableType(ref tt) => {
1453                        assert_eq!(tt, &("my_ks".to_string(), "my_table".to_string()))
1454                    }
1455                    _ => panic!("should be table"),
1456                }
1457            }
1458            _ => panic!("should be schema change"),
1459        }
1460        // type
1461        let _type = &[// schema change
1462                      0,
1463                      13,
1464                      83,
1465                      67,
1466                      72,
1467                      69,
1468                      77,
1469                      65,
1470                      95,
1471                      67,
1472                      72,
1473                      65,
1474                      78,
1475                      71,
1476                      69,
1477                      // dropped
1478                      0,
1479                      7,
1480                      68,
1481                      82,
1482                      79,
1483                      80,
1484                      80,
1485                      69,
1486                      68,
1487                      // type
1488                      0,
1489                      4,
1490                      84,
1491                      89,
1492                      80,
1493                      69,
1494                      // my_ks
1495                      0,
1496                      5,
1497                      109,
1498                      121,
1499                      95,
1500                      107,
1501                      115,
1502                      // my_table
1503                      0,
1504                      8,
1505                      109,
1506                      121,
1507                      95,
1508                      116,
1509                      97,
1510                      98,
1511                      108,
1512                      101];
1513        let mut tp: Cursor<&[u8]> = Cursor::new(_type);
1514        let tp_event = ServerEvent::from_cursor(&mut tp).unwrap();
1515        match tp_event {
1516            ServerEvent::SchemaChange(ref _c) => {
1517                assert_eq!(_c.change_type, ChangeType::Dropped);
1518                assert_eq!(_c.target, Target::Type);
1519                match _c.options {
1520                    ChangeSchemeOptions::TableType(ref tt) => {
1521                        assert_eq!(tt, &("my_ks".to_string(), "my_table".to_string()))
1522                    }
1523                    _ => panic!("should be type"),
1524                }
1525            }
1526            _ => panic!("should be schema change"),
1527        }
1528        // function
1529        let function = &[// schema change
1530                         0,
1531                         13,
1532                         83,
1533                         67,
1534                         72,
1535                         69,
1536                         77,
1537                         65,
1538                         95,
1539                         67,
1540                         72,
1541                         65,
1542                         78,
1543                         71,
1544                         69,
1545                         // dropped
1546                         0,
1547                         7,
1548                         68,
1549                         82,
1550                         79,
1551                         80,
1552                         80,
1553                         69,
1554                         68,
1555                         // function
1556                         0,
1557                         8,
1558                         70,
1559                         85,
1560                         78,
1561                         67,
1562                         84,
1563                         73,
1564                         79,
1565                         78,
1566                         // my_ks
1567                         0,
1568                         5,
1569                         109,
1570                         121,
1571                         95,
1572                         107,
1573                         115,
1574                         // name
1575                         0,
1576                         4,
1577                         110,
1578                         97,
1579                         109,
1580                         101,
1581                         // empty list of parameters
1582                         0,
1583                         0];
1584        let mut fnct: Cursor<&[u8]> = Cursor::new(function);
1585        let fnct_event = ServerEvent::from_cursor(&mut fnct).unwrap();
1586        match fnct_event {
1587            ServerEvent::SchemaChange(ref _c) => {
1588                assert_eq!(_c.change_type, ChangeType::Dropped);
1589                assert_eq!(_c.target, Target::Function);
1590                match _c.options {
1591                    ChangeSchemeOptions::FunctionAggregate(ref tt) => {
1592                        assert_eq!(tt, &("my_ks".to_string(), "name".to_string(), vec![]))
1593                    }
1594                    _ => panic!("should be function"),
1595                }
1596            }
1597            _ => panic!("should be schema change"),
1598        }
1599        // function
1600        let aggregate = &[// schema change
1601                          0,
1602                          13,
1603                          83,
1604                          67,
1605                          72,
1606                          69,
1607                          77,
1608                          65,
1609                          95,
1610                          67,
1611                          72,
1612                          65,
1613                          78,
1614                          71,
1615                          69,
1616                          // dropped
1617                          0,
1618                          7,
1619                          68,
1620                          82,
1621                          79,
1622                          80,
1623                          80,
1624                          69,
1625                          68,
1626                          // aggregate
1627                          0,
1628                          9,
1629                          65,
1630                          71,
1631                          71,
1632                          82,
1633                          69,
1634                          71,
1635                          65,
1636                          84,
1637                          69,
1638                          // my_ks
1639                          0,
1640                          5,
1641                          109,
1642                          121,
1643                          95,
1644                          107,
1645                          115,
1646                          // name
1647                          0,
1648                          4,
1649                          110,
1650                          97,
1651                          109,
1652                          101,
1653                          // empty list of parameters
1654                          0,
1655                          0];
1656        let mut aggr: Cursor<&[u8]> = Cursor::new(aggregate);
1657        let aggr_event = ServerEvent::from_cursor(&mut aggr).unwrap();
1658        match aggr_event {
1659            ServerEvent::SchemaChange(ref _c) => {
1660                assert_eq!(_c.change_type, ChangeType::Dropped);
1661                assert_eq!(_c.target, Target::Aggregate);
1662                match _c.options {
1663                    ChangeSchemeOptions::FunctionAggregate(ref tt) => {
1664                        assert_eq!(tt, &("my_ks".to_string(), "name".to_string(), vec![]))
1665                    }
1666                    _ => panic!("should be aggregate"),
1667                }
1668            }
1669            _ => panic!("should be schema change"),
1670        }
1671    }
1672}