1use std::io::Cursor;
2use std::cmp::PartialEq;
3
4use crate::frame::traits::FromCursor;
5use crate::error;
6use crate::types::{CInet, CString, CStringList};
7
8const TOPOLOGY_CHANGE: &'static str = "TOPOLOGY_CHANGE";
10const STATUS_CHANGE: &'static str = "STATUS_CHANGE";
11const SCHEMA_CHANGE: &'static str = "SCHEMA_CHANGE";
12
13const NEW_NODE: &'static str = "NEW_NODE";
15const REMOVED_NODE: &'static str = "REMOVED_NODE";
16
17const UP: &'static str = "UP";
19const DOWN: &'static str = "DOWN";
20
21const CREATED: &'static str = "CREATED";
23const UPDATED: &'static str = "UPDATED";
24const DROPPED: &'static str = "DROPPED";
25
26const KEYSPACE: &'static str = "KEYSPACE";
28const TABLE: &'static str = "TABLE";
29const TYPE: &'static str = "TYPE";
30const FUNCTION: &'static str = "FUNCTION";
31const AGGREGATE: &'static str = "AGGREGATE";
32
33#[derive(Debug, PartialEq)]
37pub enum SimpleServerEvent {
38 TopologyChange,
39 StatusChange,
40 SchemaChange,
41}
42
43impl SimpleServerEvent {
44 pub fn as_string(&self) -> String {
45 match *self {
46 SimpleServerEvent::TopologyChange => String::from(TOPOLOGY_CHANGE),
47 SimpleServerEvent::StatusChange => String::from(STATUS_CHANGE),
48 SimpleServerEvent::SchemaChange => String::from(SCHEMA_CHANGE),
49 }
50 }
51}
52
53impl From<ServerEvent> for SimpleServerEvent {
54 fn from(event: ServerEvent) -> SimpleServerEvent {
55 match event {
56 ServerEvent::TopologyChange(_) => SimpleServerEvent::TopologyChange,
57 ServerEvent::StatusChange(_) => SimpleServerEvent::StatusChange,
58 ServerEvent::SchemaChange(_) => SimpleServerEvent::SchemaChange,
59 }
60 }
61}
62
63impl<'a> From<&'a ServerEvent> for SimpleServerEvent {
64 fn from(event: &'a ServerEvent) -> SimpleServerEvent {
65 match *event {
66 ServerEvent::TopologyChange(_) => SimpleServerEvent::TopologyChange,
67 ServerEvent::StatusChange(_) => SimpleServerEvent::StatusChange,
68 ServerEvent::SchemaChange(_) => SimpleServerEvent::SchemaChange,
69 }
70 }
71}
72
73impl PartialEq<ServerEvent> for SimpleServerEvent {
74 fn eq(&self, full_event: &ServerEvent) -> bool {
75 *self == SimpleServerEvent::from(full_event)
76 }
77}
78
79#[derive(Debug)]
81pub enum ServerEvent {
82 TopologyChange(TopologyChange),
84 StatusChange(StatusChange),
86 SchemaChange(SchemaChange),
88}
89
90impl PartialEq<SimpleServerEvent> for ServerEvent {
91 fn eq(&self, event: &SimpleServerEvent) -> bool {
92 SimpleServerEvent::from(self) == *event
93 }
94}
95
96impl FromCursor for ServerEvent {
97 fn from_cursor(mut cursor: &mut Cursor<&[u8]>) -> error::Result<ServerEvent> {
98 let event_type = CString::from_cursor(&mut cursor)?;
99 match event_type.as_str() {
100 TOPOLOGY_CHANGE => {
101 Ok(ServerEvent::TopologyChange(TopologyChange::from_cursor(&mut cursor)?))
102 }
103 STATUS_CHANGE => Ok(ServerEvent::StatusChange(StatusChange::from_cursor(&mut cursor)?)),
104 SCHEMA_CHANGE => Ok(ServerEvent::SchemaChange(SchemaChange::from_cursor(&mut cursor)?)),
105 _ => Err("Unexpected server event".into()),
106 }
107 }
108}
109
110#[derive(Debug)]
112pub struct TopologyChange {
113 pub change_type: TopologyChangeType,
114 pub addr: CInet,
115}
116
117impl FromCursor for TopologyChange {
118 fn from_cursor(mut cursor: &mut Cursor<&[u8]>) -> error::Result<TopologyChange> {
119 let change_type = TopologyChangeType::from_cursor(&mut cursor)?;
120 let addr = CInet::from_cursor(&mut cursor)?;
121
122 Ok(TopologyChange { change_type: change_type,
123 addr: addr, })
124 }
125}
126
127#[derive(Debug, PartialEq)]
128pub enum TopologyChangeType {
129 NewNode,
130 RemovedNode,
131}
132
133impl FromCursor for TopologyChangeType {
134 fn from_cursor(mut cursor: &mut Cursor<&[u8]>) -> error::Result<TopologyChangeType> {
135 CString::from_cursor(&mut cursor).and_then(|tc| match tc.as_str() {
136 NEW_NODE => Ok(TopologyChangeType::NewNode),
137 REMOVED_NODE => Ok(TopologyChangeType::RemovedNode),
138 _ => Err("Unexpected topology change type received from Cluster".into()),
139 })
140 }
141}
142
143#[derive(Debug)]
145pub struct StatusChange {
146 pub change_type: StatusChangeType,
147 pub addr: CInet,
148}
149
150impl FromCursor for StatusChange {
151 fn from_cursor(mut cursor: &mut Cursor<&[u8]>) -> error::Result<StatusChange> {
152 let change_type = StatusChangeType::from_cursor(&mut cursor)?;
153 let addr = CInet::from_cursor(&mut cursor)?;
154
155 Ok(StatusChange { change_type: change_type,
156 addr: addr, })
157 }
158}
159
160#[derive(Debug, PartialEq)]
161pub enum StatusChangeType {
162 Up,
163 Down,
164}
165
166impl FromCursor for StatusChangeType {
167 fn from_cursor(mut cursor: &mut Cursor<&[u8]>) -> error::Result<StatusChangeType> {
168 CString::from_cursor(&mut cursor).and_then(|sct| match sct.as_str() {
169 UP => Ok(StatusChangeType::Up),
170 DOWN => Ok(StatusChangeType::Down),
171 _ => Err("Unexpected status change type".into()),
172 })
173 }
174}
175
176#[derive(Debug, PartialEq)]
178pub struct SchemaChange {
179 pub change_type: ChangeType,
180 pub target: Target,
181 pub options: ChangeSchemeOptions,
182}
183
184impl FromCursor for SchemaChange {
185 fn from_cursor(mut cursor: &mut Cursor<&[u8]>) -> error::Result<SchemaChange> {
186 let change_type = ChangeType::from_cursor(&mut cursor)?;
187 let target = Target::from_cursor(&mut cursor)?;
188 let options = ChangeSchemeOptions::from_cursor_and_target(&mut cursor, &target)?;
189
190 Ok(SchemaChange { change_type: change_type,
191 target: target,
192 options: options, })
193 }
194}
195
196#[derive(Debug, PartialEq)]
199pub enum ChangeType {
200 Created,
201 Updated,
202 Dropped,
203}
204
205impl FromCursor for ChangeType {
206 fn from_cursor(mut cursor: &mut Cursor<&[u8]>) -> error::Result<ChangeType> {
207 CString::from_cursor(&mut cursor).and_then(|ct| match ct.as_str() {
208 CREATED => Ok(ChangeType::Created),
209 UPDATED => Ok(ChangeType::Updated),
210 DROPPED => Ok(ChangeType::Dropped),
211 _ => Err("Unexpected schema change type".into()),
212 })
213 }
214}
215
216#[derive(Debug, PartialEq)]
219pub enum Target {
220 Keyspace,
221 Table,
222 Type,
223 Function,
224 Aggregate,
225}
226
227impl FromCursor for Target {
228 fn from_cursor(mut cursor: &mut Cursor<&[u8]>) -> error::Result<Target> {
229 CString::from_cursor(&mut cursor).and_then(|t| match t.as_str() {
230 KEYSPACE => Ok(Target::Keyspace),
231 TABLE => Ok(Target::Table),
232 TYPE => Ok(Target::Type),
233 FUNCTION => Ok(Target::Function),
234 AGGREGATE => Ok(Target::Aggregate),
235 _ => Err("Unexpected schema change target".into()),
236 })
237 }
238}
239
240#[derive(Debug, PartialEq)]
242pub enum ChangeSchemeOptions {
243 Keyspace(String),
245 TableType((String, String)),
247 FunctionAggregate((String, String, Vec<String>)),
252}
253
254impl ChangeSchemeOptions {
255 fn from_cursor_and_target(mut cursor: &mut Cursor<&[u8]>,
256 target: &Target)
257 -> error::Result<ChangeSchemeOptions> {
258 Ok(match *target {
259 Target::Keyspace => ChangeSchemeOptions::from_cursor_keyspace(&mut cursor)?,
260 Target::Table | Target::Type => {
261 ChangeSchemeOptions::from_cursor_table_type(&mut cursor)?
262 }
263 Target::Function | Target::Aggregate => {
264 ChangeSchemeOptions::from_cursor_function_aggregate(&mut cursor)?
265 }
266 })
267 }
268
269 fn from_cursor_keyspace(mut cursor: &mut Cursor<&[u8]>) -> error::Result<ChangeSchemeOptions> {
270 Ok(ChangeSchemeOptions::Keyspace(CString::from_cursor(&mut cursor)?.into_plain()))
271 }
272
273 fn from_cursor_table_type(mut cursor: &mut Cursor<&[u8]>)
274 -> error::Result<ChangeSchemeOptions> {
275 let keyspace = CString::from_cursor(&mut cursor)?.into_plain();
276 let name = CString::from_cursor(&mut cursor)?.into_plain();
277 Ok(ChangeSchemeOptions::TableType((keyspace, name)))
278 }
279
280 fn from_cursor_function_aggregate(mut cursor: &mut Cursor<&[u8]>)
281 -> error::Result<ChangeSchemeOptions> {
282 let keyspace = CString::from_cursor(&mut cursor)?.into_plain();
283 let name = CString::from_cursor(&mut cursor)?.into_plain();
284 let types = CStringList::from_cursor(&mut cursor)?.into_plain();
285 Ok(ChangeSchemeOptions::FunctionAggregate((keyspace,
286 name,
287 types)))
288 }
289}
290
291#[cfg(test)]
292mod simple_server_event_test {
293 use super::*;
294
295 #[test]
296 fn as_string() {
297 assert_eq!(SimpleServerEvent::TopologyChange.as_string(),
298 "TOPOLOGY_CHANGE".to_string());
299 assert_eq!(SimpleServerEvent::StatusChange.as_string(),
300 "STATUS_CHANGE".to_string());
301 assert_eq!(SimpleServerEvent::SchemaChange.as_string(),
302 "SCHEMA_CHANGE".to_string());
303 }
304}
305
306#[cfg(test)]
307mod topology_change_type_test {
308 use super::*;
309 use std::io::Cursor;
310 use crate::frame::traits::FromCursor;
311
312 #[test]
313 fn from_cursor() {
314 let a = &[0, 8, 78, 69, 87, 95, 78, 79, 68, 69];
315 let mut new_node: Cursor<&[u8]> = Cursor::new(a);
316 assert_eq!(TopologyChangeType::from_cursor(&mut new_node).unwrap(),
317 TopologyChangeType::NewNode);
318
319 let b = &[0, 12, 82, 69, 77, 79, 86, 69, 68, 95, 78, 79, 68, 69];
320 let mut removed_node: Cursor<&[u8]> = Cursor::new(b);
321 assert_eq!(TopologyChangeType::from_cursor(&mut removed_node).unwrap(),
322 TopologyChangeType::RemovedNode);
323 }
324
325 #[test]
326 #[should_panic]
327 fn from_cursor_wrong() {
328 let a = &[0, 1, 78];
329 let mut wrong: Cursor<&[u8]> = Cursor::new(a);
330 let _ = TopologyChangeType::from_cursor(&mut wrong).unwrap();
331 }
332}
333
334#[cfg(test)]
335mod status_change_type_test {
336 use super::*;
337 use std::io::Cursor;
338 use crate::frame::traits::FromCursor;
339
340 #[test]
341 fn from_cursor() {
342 let a = &[0, 2, 85, 80];
343 let mut up: Cursor<&[u8]> = Cursor::new(a);
344 assert_eq!(StatusChangeType::from_cursor(&mut up).unwrap(),
345 StatusChangeType::Up);
346
347 let b = &[0, 4, 68, 79, 87, 78];
348 let mut down: Cursor<&[u8]> = Cursor::new(b);
349 assert_eq!(StatusChangeType::from_cursor(&mut down).unwrap(),
350 StatusChangeType::Down);
351 }
352
353 #[test]
354 #[should_panic]
355 fn from_cursor_wrong() {
356 let a = &[0, 1, 78];
357 let mut wrong: Cursor<&[u8]> = Cursor::new(a);
358 let _ = StatusChangeType::from_cursor(&mut wrong).unwrap();
359 }
360}
361
362#[cfg(test)]
363mod schema_change_type_test {
364 use super::*;
365 use std::io::Cursor;
366 use crate::frame::traits::FromCursor;
367
368 #[test]
369 fn from_cursor() {
370 let a = &[0, 7, 67, 82, 69, 65, 84, 69, 68];
371 let mut created: Cursor<&[u8]> = Cursor::new(a);
372 assert_eq!(ChangeType::from_cursor(&mut created).unwrap(),
373 ChangeType::Created);
374
375 let b = &[0, 7, 85, 80, 68, 65, 84, 69, 68];
376 let mut updated: Cursor<&[u8]> = Cursor::new(b);
377 assert_eq!(ChangeType::from_cursor(&mut updated).unwrap(),
378 ChangeType::Updated);
379
380 let c = &[0, 7, 68, 82, 79, 80, 80, 69, 68];
381 let mut dropped: Cursor<&[u8]> = Cursor::new(c);
382 assert_eq!(ChangeType::from_cursor(&mut dropped).unwrap(),
383 ChangeType::Dropped);
384 }
385
386 #[test]
387 #[should_panic]
388 fn from_cursor_wrong() {
389 let a = &[0, 1, 78];
390 let mut wrong: Cursor<&[u8]> = Cursor::new(a);
391 let _ = ChangeType::from_cursor(&mut wrong).unwrap();
392 }
393}
394
395#[cfg(test)]
396mod schema_change_target_test {
397 use super::*;
398 use std::io::Cursor;
399 use crate::frame::traits::FromCursor;
400
401 #[test]
402 fn from_cursor() {
403 let a = &[0, 8, 75, 69, 89, 83, 80, 65, 67, 69];
404 let mut keyspace: Cursor<&[u8]> = Cursor::new(a);
405 assert_eq!(Target::from_cursor(&mut keyspace).unwrap(),
406 Target::Keyspace);
407
408 let b = &[0, 5, 84, 65, 66, 76, 69];
409 let mut table: Cursor<&[u8]> = Cursor::new(b);
410 assert_eq!(Target::from_cursor(&mut table).unwrap(), Target::Table);
411
412 let c = &[0, 4, 84, 89, 80, 69];
413 let mut _type: Cursor<&[u8]> = Cursor::new(c);
414 assert_eq!(Target::from_cursor(&mut _type).unwrap(), Target::Type);
415
416 let d = &[0, 8, 70, 85, 78, 67, 84, 73, 79, 78];
417 let mut function: Cursor<&[u8]> = Cursor::new(d);
418 assert_eq!(Target::from_cursor(&mut function).unwrap(),
419 Target::Function);
420
421 let e = &[0, 9, 65, 71, 71, 82, 69, 71, 65, 84, 69];
422 let mut aggregate: Cursor<&[u8]> = Cursor::new(e);
423 assert_eq!(Target::from_cursor(&mut aggregate).unwrap(),
424 Target::Aggregate);
425 }
426
427 #[test]
428 #[should_panic]
429 fn from_cursor_wrong() {
430 let a = &[0, 1, 78];
431 let mut wrong: Cursor<&[u8]> = Cursor::new(a);
432 let _ = Target::from_cursor(&mut wrong).unwrap();
433 }
434}
435
436#[cfg(test)]
437mod server_event {
438 use super::*;
439 use std::io::Cursor;
440 use crate::frame::traits::FromCursor;
441
442 #[test]
443 fn topology_change_new_node() {
444 let bytes = &[0,
446 15,
447 84,
448 79,
449 80,
450 79,
451 76,
452 79,
453 71,
454 89,
455 95,
456 67,
457 72,
458 65,
459 78,
460 71,
461 69,
462 0,
464 8,
465 78,
466 69,
467 87,
468 95,
469 78,
470 79,
471 68,
472 69,
473 0,
475 4,
476 127,
477 0,
478 0,
479 1,
480 0,
481 0,
482 0,
483 1];
484 let mut c: Cursor<&[u8]> = Cursor::new(bytes);
485 let event = ServerEvent::from_cursor(&mut c).unwrap();
486 match event {
487 ServerEvent::TopologyChange(ref tc) => {
488 assert_eq!(tc.change_type, TopologyChangeType::NewNode);
489 assert_eq!(format!("{:?}", tc.addr.addr), "V4(127.0.0.1:1)");
490 }
491 _ => panic!("should be topology change event"),
492 }
493 }
494
495 #[test]
496 fn topology_change_removed_node() {
497 let bytes = &[0,
499 15,
500 84,
501 79,
502 80,
503 79,
504 76,
505 79,
506 71,
507 89,
508 95,
509 67,
510 72,
511 65,
512 78,
513 71,
514 69,
515 0,
517 12,
518 82,
519 69,
520 77,
521 79,
522 86,
523 69,
524 68,
525 95,
526 78,
527 79,
528 68,
529 69,
530 0,
532 4,
533 127,
534 0,
535 0,
536 1,
537 0,
538 0,
539 0,
540 1];
541 let mut c: Cursor<&[u8]> = Cursor::new(bytes);
542 let event = ServerEvent::from_cursor(&mut c).unwrap();
543 match event {
544 ServerEvent::TopologyChange(ref tc) => {
545 assert_eq!(tc.change_type, TopologyChangeType::RemovedNode);
546 assert_eq!(format!("{:?}", tc.addr.addr), "V4(127.0.0.1:1)");
547 }
548 _ => panic!("should be topology change event"),
549 }
550 }
551
552 #[test]
553 fn status_change_up() {
554 let bytes = &[0,
556 13,
557 83,
558 84,
559 65,
560 84,
561 85,
562 83,
563 95,
564 67,
565 72,
566 65,
567 78,
568 71,
569 69,
570 0,
572 2,
573 85,
574 80,
575 0,
577 4,
578 127,
579 0,
580 0,
581 1,
582 0,
583 0,
584 0,
585 1];
586 let mut c: Cursor<&[u8]> = Cursor::new(bytes);
587 let event = ServerEvent::from_cursor(&mut c).unwrap();
588 match event {
589 ServerEvent::StatusChange(ref tc) => {
590 assert_eq!(tc.change_type, StatusChangeType::Up);
591 assert_eq!(format!("{:?}", tc.addr.addr), "V4(127.0.0.1:1)");
592 }
593 _ => panic!("should be status change up"),
594 }
595 }
596
597 #[test]
598 fn status_change_down() {
599 let bytes = &[0,
601 13,
602 83,
603 84,
604 65,
605 84,
606 85,
607 83,
608 95,
609 67,
610 72,
611 65,
612 78,
613 71,
614 69,
615 0,
617 4,
618 68,
619 79,
620 87,
621 78,
622 0,
624 4,
625 127,
626 0,
627 0,
628 1,
629 0,
630 0,
631 0,
632 1];
633 let mut c: Cursor<&[u8]> = Cursor::new(bytes);
634 let event = ServerEvent::from_cursor(&mut c).unwrap();
635 match event {
636 ServerEvent::StatusChange(ref tc) => {
637 assert_eq!(tc.change_type, StatusChangeType::Down);
638 assert_eq!(format!("{:?}", tc.addr.addr), "V4(127.0.0.1:1)");
639 }
640 _ => panic!("should be status change down"),
641 }
642 }
643
644 #[test]
645 fn schema_change_created() {
646 let keyspace = &[0,
649 13,
650 83,
651 67,
652 72,
653 69,
654 77,
655 65,
656 95,
657 67,
658 72,
659 65,
660 78,
661 71,
662 69,
663 0,
665 7,
666 67,
667 82,
668 69,
669 65,
670 84,
671 69,
672 68,
673 0,
675 8,
676 75,
677 69,
678 89,
679 83,
680 80,
681 65,
682 67,
683 69,
684 0,
686 5,
687 109,
688 121,
689 95,
690 107,
691 115];
692 let mut ks: Cursor<&[u8]> = Cursor::new(keyspace);
693 let ks_event = ServerEvent::from_cursor(&mut ks).unwrap();
694 match ks_event {
695 ServerEvent::SchemaChange(ref _c) => {
696 assert_eq!(_c.change_type, ChangeType::Created);
697 assert_eq!(_c.target, Target::Keyspace);
698 match _c.options {
699 ChangeSchemeOptions::Keyspace(ref ks) => assert_eq!(ks.as_str(), "my_ks"),
700 _ => panic!("should be keyspace"),
701 }
702 }
703 _ => panic!("should be schema change"),
704 }
705 let table = &[0,
708 13,
709 83,
710 67,
711 72,
712 69,
713 77,
714 65,
715 95,
716 67,
717 72,
718 65,
719 78,
720 71,
721 69,
722 0,
724 7,
725 67,
726 82,
727 69,
728 65,
729 84,
730 69,
731 68,
732 0,
734 5,
735 84,
736 65,
737 66,
738 76,
739 69,
740 0,
742 5,
743 109,
744 121,
745 95,
746 107,
747 115,
748 0,
750 8,
751 109,
752 121,
753 95,
754 116,
755 97,
756 98,
757 108,
758 101];
759 let mut tb: Cursor<&[u8]> = Cursor::new(table);
760 let tb_event = ServerEvent::from_cursor(&mut tb).unwrap();
761 match tb_event {
762 ServerEvent::SchemaChange(ref _c) => {
763 assert_eq!(_c.change_type, ChangeType::Created);
764 assert_eq!(_c.target, Target::Table);
765 match _c.options {
766 ChangeSchemeOptions::TableType(ref tt) => {
767 assert_eq!(tt, &("my_ks".to_string(), "my_table".to_string()))
768 }
769 _ => panic!("should be table"),
770 }
771 }
772 _ => panic!("should be schema change"),
773 }
774 let _type = &[0,
777 13,
778 83,
779 67,
780 72,
781 69,
782 77,
783 65,
784 95,
785 67,
786 72,
787 65,
788 78,
789 71,
790 69,
791 0,
793 7,
794 67,
795 82,
796 69,
797 65,
798 84,
799 69,
800 68,
801 0,
803 4,
804 84,
805 89,
806 80,
807 69,
808 0,
810 5,
811 109,
812 121,
813 95,
814 107,
815 115,
816 0,
818 8,
819 109,
820 121,
821 95,
822 116,
823 97,
824 98,
825 108,
826 101];
827 let mut tp: Cursor<&[u8]> = Cursor::new(_type);
828 let tp_event = ServerEvent::from_cursor(&mut tp).unwrap();
829 match tp_event {
830 ServerEvent::SchemaChange(ref _c) => {
831 assert_eq!(_c.change_type, ChangeType::Created);
832 assert_eq!(_c.target, Target::Type);
833 match _c.options {
834 ChangeSchemeOptions::TableType(ref tt) => {
835 assert_eq!(tt, &("my_ks".to_string(), "my_table".to_string()))
836 }
837 _ => panic!("should be type"),
838 }
839 }
840 _ => panic!("should be schema change"),
841 }
842 let function = &[0,
845 13,
846 83,
847 67,
848 72,
849 69,
850 77,
851 65,
852 95,
853 67,
854 72,
855 65,
856 78,
857 71,
858 69,
859 0,
861 7,
862 67,
863 82,
864 69,
865 65,
866 84,
867 69,
868 68,
869 0,
871 8,
872 70,
873 85,
874 78,
875 67,
876 84,
877 73,
878 79,
879 78,
880 0,
882 5,
883 109,
884 121,
885 95,
886 107,
887 115,
888 0,
890 4,
891 110,
892 97,
893 109,
894 101,
895 0,
897 0];
898 let mut fnct: Cursor<&[u8]> = Cursor::new(function);
899 let fnct_event = ServerEvent::from_cursor(&mut fnct).unwrap();
900 match fnct_event {
901 ServerEvent::SchemaChange(ref _c) => {
902 assert_eq!(_c.change_type, ChangeType::Created);
903 assert_eq!(_c.target, Target::Function);
904 match _c.options {
905 ChangeSchemeOptions::FunctionAggregate(ref tt) => {
906 assert_eq!(tt, &("my_ks".to_string(), "name".to_string(), vec![]))
907 }
908 _ => panic!("should be function"),
909 }
910 }
911 _ => panic!("should be schema change"),
912 }
913 let aggregate = &[0,
916 13,
917 83,
918 67,
919 72,
920 69,
921 77,
922 65,
923 95,
924 67,
925 72,
926 65,
927 78,
928 71,
929 69,
930 0,
932 7,
933 67,
934 82,
935 69,
936 65,
937 84,
938 69,
939 68,
940 0,
942 9,
943 65,
944 71,
945 71,
946 82,
947 69,
948 71,
949 65,
950 84,
951 69,
952 0,
954 5,
955 109,
956 121,
957 95,
958 107,
959 115,
960 0,
962 4,
963 110,
964 97,
965 109,
966 101,
967 0,
969 0];
970 let mut aggr: Cursor<&[u8]> = Cursor::new(aggregate);
971 let aggr_event = ServerEvent::from_cursor(&mut aggr).unwrap();
972 match aggr_event {
973 ServerEvent::SchemaChange(ref _c) => {
974 assert_eq!(_c.change_type, ChangeType::Created);
975 assert_eq!(_c.target, Target::Aggregate);
976 match _c.options {
977 ChangeSchemeOptions::FunctionAggregate(ref tt) => {
978 assert_eq!(tt, &("my_ks".to_string(), "name".to_string(), vec![]))
979 }
980 _ => panic!("should be aggregate"),
981 }
982 }
983 _ => panic!("should be schema change"),
984 }
985 }
986
987 #[test]
988 fn schema_change_updated() {
989 let keyspace = &[0,
992 13,
993 83,
994 67,
995 72,
996 69,
997 77,
998 65,
999 95,
1000 67,
1001 72,
1002 65,
1003 78,
1004 71,
1005 69,
1006 0,
1008 7,
1009 85,
1010 80,
1011 68,
1012 65,
1013 84,
1014 69,
1015 68,
1016 0,
1018 8,
1019 75,
1020 69,
1021 89,
1022 83,
1023 80,
1024 65,
1025 67,
1026 69,
1027 0,
1029 5,
1030 109,
1031 121,
1032 95,
1033 107,
1034 115];
1035 let mut ks: Cursor<&[u8]> = Cursor::new(keyspace);
1036 let ks_event = ServerEvent::from_cursor(&mut ks).unwrap();
1037 match ks_event {
1038 ServerEvent::SchemaChange(ref _c) => {
1039 assert_eq!(_c.change_type, ChangeType::Updated);
1040 assert_eq!(_c.target, Target::Keyspace);
1041 match _c.options {
1042 ChangeSchemeOptions::Keyspace(ref ks) => assert_eq!(ks.as_str(), "my_ks"),
1043 _ => panic!("should be keyspace"),
1044 }
1045 }
1046 _ => panic!("should be schema change"),
1047 }
1048 let table = &[0,
1051 13,
1052 83,
1053 67,
1054 72,
1055 69,
1056 77,
1057 65,
1058 95,
1059 67,
1060 72,
1061 65,
1062 78,
1063 71,
1064 69,
1065 0,
1067 7,
1068 85,
1069 80,
1070 68,
1071 65,
1072 84,
1073 69,
1074 68,
1075 0,
1077 5,
1078 84,
1079 65,
1080 66,
1081 76,
1082 69,
1083 0,
1085 5,
1086 109,
1087 121,
1088 95,
1089 107,
1090 115,
1091 0,
1093 8,
1094 109,
1095 121,
1096 95,
1097 116,
1098 97,
1099 98,
1100 108,
1101 101];
1102 let mut tb: Cursor<&[u8]> = Cursor::new(table);
1103 let tb_event = ServerEvent::from_cursor(&mut tb).unwrap();
1104 match tb_event {
1105 ServerEvent::SchemaChange(ref _c) => {
1106 assert_eq!(_c.change_type, ChangeType::Updated);
1107 assert_eq!(_c.target, Target::Table);
1108 match _c.options {
1109 ChangeSchemeOptions::TableType(ref tt) => {
1110 assert_eq!(tt, &("my_ks".to_string(), "my_table".to_string()))
1111 }
1112 _ => panic!("should be table"),
1113 }
1114 }
1115 _ => panic!("should be schema change"),
1116 }
1117 let _type = &[0,
1120 13,
1121 83,
1122 67,
1123 72,
1124 69,
1125 77,
1126 65,
1127 95,
1128 67,
1129 72,
1130 65,
1131 78,
1132 71,
1133 69,
1134 0,
1136 7,
1137 85,
1138 80,
1139 68,
1140 65,
1141 84,
1142 69,
1143 68,
1144 0,
1146 4,
1147 84,
1148 89,
1149 80,
1150 69,
1151 0,
1153 5,
1154 109,
1155 121,
1156 95,
1157 107,
1158 115,
1159 0,
1161 8,
1162 109,
1163 121,
1164 95,
1165 116,
1166 97,
1167 98,
1168 108,
1169 101];
1170 let mut tp: Cursor<&[u8]> = Cursor::new(_type);
1171 let tp_event = ServerEvent::from_cursor(&mut tp).unwrap();
1172 match tp_event {
1173 ServerEvent::SchemaChange(ref _c) => {
1174 assert_eq!(_c.change_type, ChangeType::Updated);
1175 assert_eq!(_c.target, Target::Type);
1176 match _c.options {
1177 ChangeSchemeOptions::TableType(ref tt) => {
1178 assert_eq!(tt, &("my_ks".to_string(), "my_table".to_string()))
1179 }
1180 _ => panic!("should be type"),
1181 }
1182 }
1183 _ => panic!("should be schema change"),
1184 }
1185 let function = &[0,
1188 13,
1189 83,
1190 67,
1191 72,
1192 69,
1193 77,
1194 65,
1195 95,
1196 67,
1197 72,
1198 65,
1199 78,
1200 71,
1201 69,
1202 0,
1204 7,
1205 85,
1206 80,
1207 68,
1208 65,
1209 84,
1210 69,
1211 68,
1212 0,
1214 8,
1215 70,
1216 85,
1217 78,
1218 67,
1219 84,
1220 73,
1221 79,
1222 78,
1223 0,
1225 5,
1226 109,
1227 121,
1228 95,
1229 107,
1230 115,
1231 0,
1233 4,
1234 110,
1235 97,
1236 109,
1237 101,
1238 0,
1240 0];
1241 let mut fnct: Cursor<&[u8]> = Cursor::new(function);
1242 let fnct_event = ServerEvent::from_cursor(&mut fnct).unwrap();
1243 match fnct_event {
1244 ServerEvent::SchemaChange(ref _c) => {
1245 assert_eq!(_c.change_type, ChangeType::Updated);
1246 assert_eq!(_c.target, Target::Function);
1247 match _c.options {
1248 ChangeSchemeOptions::FunctionAggregate(ref tt) => {
1249 assert_eq!(tt, &("my_ks".to_string(), "name".to_string(), vec![]))
1250 }
1251 _ => panic!("should be function"),
1252 }
1253 }
1254 _ => panic!("should be schema change"),
1255 }
1256 let aggregate = &[0,
1259 13,
1260 83,
1261 67,
1262 72,
1263 69,
1264 77,
1265 65,
1266 95,
1267 67,
1268 72,
1269 65,
1270 78,
1271 71,
1272 69,
1273 0,
1275 7,
1276 85,
1277 80,
1278 68,
1279 65,
1280 84,
1281 69,
1282 68,
1283 0,
1285 9,
1286 65,
1287 71,
1288 71,
1289 82,
1290 69,
1291 71,
1292 65,
1293 84,
1294 69,
1295 0,
1297 5,
1298 109,
1299 121,
1300 95,
1301 107,
1302 115,
1303 0,
1305 4,
1306 110,
1307 97,
1308 109,
1309 101,
1310 0,
1312 0];
1313 let mut aggr: Cursor<&[u8]> = Cursor::new(aggregate);
1314 let aggr_event = ServerEvent::from_cursor(&mut aggr).unwrap();
1315 match aggr_event {
1316 ServerEvent::SchemaChange(ref _c) => {
1317 assert_eq!(_c.change_type, ChangeType::Updated);
1318 assert_eq!(_c.target, Target::Aggregate);
1319 match _c.options {
1320 ChangeSchemeOptions::FunctionAggregate(ref tt) => {
1321 assert_eq!(tt, &("my_ks".to_string(), "name".to_string(), vec![]))
1322 }
1323 _ => panic!("should be aggregate"),
1324 }
1325 }
1326 _ => panic!("should be schema change"),
1327 }
1328 }
1329
1330 #[test]
1331 fn schema_change_dropped() {
1332 let keyspace = &[0,
1335 13,
1336 83,
1337 67,
1338 72,
1339 69,
1340 77,
1341 65,
1342 95,
1343 67,
1344 72,
1345 65,
1346 78,
1347 71,
1348 69,
1349 0,
1351 7,
1352 68,
1353 82,
1354 79,
1355 80,
1356 80,
1357 69,
1358 68,
1359 0,
1361 8,
1362 75,
1363 69,
1364 89,
1365 83,
1366 80,
1367 65,
1368 67,
1369 69,
1370 0,
1372 5,
1373 109,
1374 121,
1375 95,
1376 107,
1377 115];
1378 let mut ks: Cursor<&[u8]> = Cursor::new(keyspace);
1379 let ks_event = ServerEvent::from_cursor(&mut ks).unwrap();
1380 match ks_event {
1381 ServerEvent::SchemaChange(ref _c) => {
1382 assert_eq!(_c.change_type, ChangeType::Dropped);
1383 assert_eq!(_c.target, Target::Keyspace);
1384 match _c.options {
1385 ChangeSchemeOptions::Keyspace(ref ks) => assert_eq!(ks.as_str(), "my_ks"),
1386 _ => panic!("should be keyspace"),
1387 }
1388 }
1389 _ => panic!("should be schema change"),
1390 }
1391 let table = &[0,
1394 13,
1395 83,
1396 67,
1397 72,
1398 69,
1399 77,
1400 65,
1401 95,
1402 67,
1403 72,
1404 65,
1405 78,
1406 71,
1407 69,
1408 0,
1410 7,
1411 68,
1412 82,
1413 79,
1414 80,
1415 80,
1416 69,
1417 68,
1418 0,
1420 5,
1421 84,
1422 65,
1423 66,
1424 76,
1425 69,
1426 0,
1428 5,
1429 109,
1430 121,
1431 95,
1432 107,
1433 115,
1434 0,
1436 8,
1437 109,
1438 121,
1439 95,
1440 116,
1441 97,
1442 98,
1443 108,
1444 101];
1445 let mut tb: Cursor<&[u8]> = Cursor::new(table);
1446 let tb_event = ServerEvent::from_cursor(&mut tb).unwrap();
1447 match tb_event {
1448 ServerEvent::SchemaChange(ref _c) => {
1449 assert_eq!(_c.change_type, ChangeType::Dropped);
1450 assert_eq!(_c.target, Target::Table);
1451 match _c.options {
1452 ChangeSchemeOptions::TableType(ref tt) => {
1453 assert_eq!(tt, &("my_ks".to_string(), "my_table".to_string()))
1454 }
1455 _ => panic!("should be table"),
1456 }
1457 }
1458 _ => panic!("should be schema change"),
1459 }
1460 let _type = &[0,
1463 13,
1464 83,
1465 67,
1466 72,
1467 69,
1468 77,
1469 65,
1470 95,
1471 67,
1472 72,
1473 65,
1474 78,
1475 71,
1476 69,
1477 0,
1479 7,
1480 68,
1481 82,
1482 79,
1483 80,
1484 80,
1485 69,
1486 68,
1487 0,
1489 4,
1490 84,
1491 89,
1492 80,
1493 69,
1494 0,
1496 5,
1497 109,
1498 121,
1499 95,
1500 107,
1501 115,
1502 0,
1504 8,
1505 109,
1506 121,
1507 95,
1508 116,
1509 97,
1510 98,
1511 108,
1512 101];
1513 let mut tp: Cursor<&[u8]> = Cursor::new(_type);
1514 let tp_event = ServerEvent::from_cursor(&mut tp).unwrap();
1515 match tp_event {
1516 ServerEvent::SchemaChange(ref _c) => {
1517 assert_eq!(_c.change_type, ChangeType::Dropped);
1518 assert_eq!(_c.target, Target::Type);
1519 match _c.options {
1520 ChangeSchemeOptions::TableType(ref tt) => {
1521 assert_eq!(tt, &("my_ks".to_string(), "my_table".to_string()))
1522 }
1523 _ => panic!("should be type"),
1524 }
1525 }
1526 _ => panic!("should be schema change"),
1527 }
1528 let function = &[0,
1531 13,
1532 83,
1533 67,
1534 72,
1535 69,
1536 77,
1537 65,
1538 95,
1539 67,
1540 72,
1541 65,
1542 78,
1543 71,
1544 69,
1545 0,
1547 7,
1548 68,
1549 82,
1550 79,
1551 80,
1552 80,
1553 69,
1554 68,
1555 0,
1557 8,
1558 70,
1559 85,
1560 78,
1561 67,
1562 84,
1563 73,
1564 79,
1565 78,
1566 0,
1568 5,
1569 109,
1570 121,
1571 95,
1572 107,
1573 115,
1574 0,
1576 4,
1577 110,
1578 97,
1579 109,
1580 101,
1581 0,
1583 0];
1584 let mut fnct: Cursor<&[u8]> = Cursor::new(function);
1585 let fnct_event = ServerEvent::from_cursor(&mut fnct).unwrap();
1586 match fnct_event {
1587 ServerEvent::SchemaChange(ref _c) => {
1588 assert_eq!(_c.change_type, ChangeType::Dropped);
1589 assert_eq!(_c.target, Target::Function);
1590 match _c.options {
1591 ChangeSchemeOptions::FunctionAggregate(ref tt) => {
1592 assert_eq!(tt, &("my_ks".to_string(), "name".to_string(), vec![]))
1593 }
1594 _ => panic!("should be function"),
1595 }
1596 }
1597 _ => panic!("should be schema change"),
1598 }
1599 let aggregate = &[0,
1602 13,
1603 83,
1604 67,
1605 72,
1606 69,
1607 77,
1608 65,
1609 95,
1610 67,
1611 72,
1612 65,
1613 78,
1614 71,
1615 69,
1616 0,
1618 7,
1619 68,
1620 82,
1621 79,
1622 80,
1623 80,
1624 69,
1625 68,
1626 0,
1628 9,
1629 65,
1630 71,
1631 71,
1632 82,
1633 69,
1634 71,
1635 65,
1636 84,
1637 69,
1638 0,
1640 5,
1641 109,
1642 121,
1643 95,
1644 107,
1645 115,
1646 0,
1648 4,
1649 110,
1650 97,
1651 109,
1652 101,
1653 0,
1655 0];
1656 let mut aggr: Cursor<&[u8]> = Cursor::new(aggregate);
1657 let aggr_event = ServerEvent::from_cursor(&mut aggr).unwrap();
1658 match aggr_event {
1659 ServerEvent::SchemaChange(ref _c) => {
1660 assert_eq!(_c.change_type, ChangeType::Dropped);
1661 assert_eq!(_c.target, Target::Aggregate);
1662 match _c.options {
1663 ChangeSchemeOptions::FunctionAggregate(ref tt) => {
1664 assert_eq!(tt, &("my_ks".to_string(), "name".to_string(), vec![]))
1665 }
1666 _ => panic!("should be aggregate"),
1667 }
1668 }
1669 _ => panic!("should be schema change"),
1670 }
1671 }
1672}