1use crate::frame::traits::FromCursor;
2use crate::frame::{Serialize, Version};
3use crate::types::{from_cursor_str, from_cursor_string_list, serialize_str, CIntShort};
4use crate::{error, Error};
5use derive_more::Display;
6use std::cmp::PartialEq;
7use std::convert::TryFrom;
8use std::io::Cursor;
9use std::net::SocketAddr;
10
11const TOPOLOGY_CHANGE: &str = "TOPOLOGY_CHANGE";
13const STATUS_CHANGE: &str = "STATUS_CHANGE";
14const SCHEMA_CHANGE: &str = "SCHEMA_CHANGE";
15
16const NEW_NODE: &str = "NEW_NODE";
18const REMOVED_NODE: &str = "REMOVED_NODE";
19
20const UP: &str = "UP";
22const DOWN: &str = "DOWN";
23
24const CREATED: &str = "CREATED";
26const UPDATED: &str = "UPDATED";
27const DROPPED: &str = "DROPPED";
28
29const KEYSPACE: &str = "KEYSPACE";
31const TABLE: &str = "TABLE";
32const TYPE: &str = "TYPE";
33const FUNCTION: &str = "FUNCTION";
34const AGGREGATE: &str = "AGGREGATE";
35
36#[derive(Debug, PartialEq, Copy, Clone, Ord, PartialOrd, Eq, Hash)]
40#[non_exhaustive]
41pub enum SimpleServerEvent {
42 TopologyChange,
43 StatusChange,
44 SchemaChange,
45}
46
47impl SimpleServerEvent {
48 pub fn as_str(&self) -> &'static str {
49 match *self {
50 SimpleServerEvent::TopologyChange => TOPOLOGY_CHANGE,
51 SimpleServerEvent::StatusChange => STATUS_CHANGE,
52 SimpleServerEvent::SchemaChange => SCHEMA_CHANGE,
53 }
54 }
55}
56
57impl From<ServerEvent> for SimpleServerEvent {
58 fn from(event: ServerEvent) -> SimpleServerEvent {
59 match event {
60 ServerEvent::TopologyChange(_) => SimpleServerEvent::TopologyChange,
61 ServerEvent::StatusChange(_) => SimpleServerEvent::StatusChange,
62 ServerEvent::SchemaChange(_) => SimpleServerEvent::SchemaChange,
63 }
64 }
65}
66
67impl<'a> From<&'a ServerEvent> for SimpleServerEvent {
68 fn from(event: &'a ServerEvent) -> SimpleServerEvent {
69 match *event {
70 ServerEvent::TopologyChange(_) => SimpleServerEvent::TopologyChange,
71 ServerEvent::StatusChange(_) => SimpleServerEvent::StatusChange,
72 ServerEvent::SchemaChange(_) => SimpleServerEvent::SchemaChange,
73 }
74 }
75}
76
77impl TryFrom<&str> for SimpleServerEvent {
78 type Error = error::Error;
79
80 fn try_from(value: &str) -> Result<Self, Self::Error> {
81 match value {
82 TOPOLOGY_CHANGE => Ok(SimpleServerEvent::TopologyChange),
83 STATUS_CHANGE => Ok(SimpleServerEvent::StatusChange),
84 SCHEMA_CHANGE => Ok(SimpleServerEvent::SchemaChange),
85 value => Err(Error::UnknownServerEvent(value.into())),
86 }
87 }
88}
89
90impl PartialEq<ServerEvent> for SimpleServerEvent {
91 fn eq(&self, full_event: &ServerEvent) -> bool {
92 self == &SimpleServerEvent::from(full_event)
93 }
94}
95
96#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
98#[non_exhaustive]
99pub enum ServerEvent {
100 TopologyChange(TopologyChange),
102 StatusChange(StatusChange),
104 SchemaChange(SchemaChange),
106}
107
108impl Serialize for ServerEvent {
109 fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
110 match &self {
111 ServerEvent::TopologyChange(t) => {
112 serialize_str(cursor, TOPOLOGY_CHANGE, version);
113 t.serialize(cursor, version);
114 }
115 ServerEvent::StatusChange(s) => {
116 serialize_str(cursor, STATUS_CHANGE, version);
117 s.serialize(cursor, version);
118 }
119 ServerEvent::SchemaChange(s) => {
120 serialize_str(cursor, SCHEMA_CHANGE, version);
121 s.serialize(cursor, version);
122 }
123 }
124 }
125}
126
127impl PartialEq<SimpleServerEvent> for ServerEvent {
128 fn eq(&self, event: &SimpleServerEvent) -> bool {
129 &SimpleServerEvent::from(self) == event
130 }
131}
132
133impl FromCursor for ServerEvent {
134 fn from_cursor(cursor: &mut Cursor<&[u8]>, version: Version) -> error::Result<ServerEvent> {
135 let event_type = from_cursor_str(cursor)?;
136 match event_type {
137 TOPOLOGY_CHANGE => Ok(ServerEvent::TopologyChange(TopologyChange::from_cursor(
138 cursor, version,
139 )?)),
140 STATUS_CHANGE => Ok(ServerEvent::StatusChange(StatusChange::from_cursor(
141 cursor, version,
142 )?)),
143 SCHEMA_CHANGE => Ok(ServerEvent::SchemaChange(SchemaChange::from_cursor(
144 cursor, version,
145 )?)),
146 _ => Err(Error::UnknownServerEvent(event_type.into())),
147 }
148 }
149}
150
151#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
153pub struct TopologyChange {
154 pub change_type: TopologyChangeType,
155 pub addr: SocketAddr,
156}
157
158impl Serialize for TopologyChange {
159 fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
161 self.change_type.serialize(cursor, version);
162 self.addr.serialize(cursor, version);
163 }
164}
165
166impl FromCursor for TopologyChange {
167 fn from_cursor(cursor: &mut Cursor<&[u8]>, version: Version) -> error::Result<TopologyChange> {
168 let change_type = TopologyChangeType::from_cursor(cursor, version)?;
169 let addr = SocketAddr::from_cursor(cursor, version)?;
170
171 Ok(TopologyChange { change_type, addr })
172 }
173}
174
175#[derive(Debug, Copy, Clone, PartialEq, Ord, PartialOrd, Eq, Hash, Display)]
176#[non_exhaustive]
177pub enum TopologyChangeType {
178 NewNode,
179 RemovedNode,
180}
181
182impl Serialize for TopologyChangeType {
183 fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
184 match &self {
185 TopologyChangeType::NewNode => serialize_str(cursor, NEW_NODE, version),
186 TopologyChangeType::RemovedNode => serialize_str(cursor, REMOVED_NODE, version),
187 }
188 }
189}
190
191impl FromCursor for TopologyChangeType {
192 fn from_cursor(
193 cursor: &mut Cursor<&[u8]>,
194 _version: Version,
195 ) -> error::Result<TopologyChangeType> {
196 from_cursor_str(cursor).and_then(|tc| match tc {
197 NEW_NODE => Ok(TopologyChangeType::NewNode),
198 REMOVED_NODE => Ok(TopologyChangeType::RemovedNode),
199 _ => Err(Error::UnexpectedTopologyChangeType(tc.into())),
200 })
201 }
202}
203
204#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
206pub struct StatusChange {
207 pub change_type: StatusChangeType,
208 pub addr: SocketAddr,
209}
210
211impl Serialize for StatusChange {
212 fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
214 self.change_type.serialize(cursor, version);
215 self.addr.serialize(cursor, version);
216 }
217}
218
219impl FromCursor for StatusChange {
220 fn from_cursor(cursor: &mut Cursor<&[u8]>, version: Version) -> error::Result<StatusChange> {
221 let change_type = StatusChangeType::from_cursor(cursor, version)?;
222 let addr = SocketAddr::from_cursor(cursor, version)?;
223
224 Ok(StatusChange { change_type, addr })
225 }
226}
227
228#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Display)]
229#[non_exhaustive]
230pub enum StatusChangeType {
231 Up,
232 Down,
233}
234
235impl Serialize for StatusChangeType {
236 fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
237 match self {
238 StatusChangeType::Up => serialize_str(cursor, UP, version),
239 StatusChangeType::Down => serialize_str(cursor, DOWN, version),
240 }
241 }
242}
243
244impl FromCursor for StatusChangeType {
245 fn from_cursor(
246 cursor: &mut Cursor<&[u8]>,
247 _version: Version,
248 ) -> error::Result<StatusChangeType> {
249 from_cursor_str(cursor).and_then(|sct| match sct {
250 UP => Ok(StatusChangeType::Up),
251 DOWN => Ok(StatusChangeType::Down),
252 _ => Err(Error::UnexpectedStatusChangeType(sct.into())),
253 })
254 }
255}
256
257#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
259pub struct SchemaChange {
260 pub change_type: SchemaChangeType,
261 pub target: SchemaChangeTarget,
262 pub options: SchemaChangeOptions,
263}
264
265impl Serialize for SchemaChange {
266 fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
267 self.change_type.serialize(cursor, version);
268 self.target.serialize(cursor, version);
269 self.options.serialize(cursor, version);
270 }
271}
272
273impl FromCursor for SchemaChange {
274 fn from_cursor(cursor: &mut Cursor<&[u8]>, version: Version) -> error::Result<SchemaChange> {
275 let change_type = SchemaChangeType::from_cursor(cursor, version)?;
276 let target = SchemaChangeTarget::from_cursor(cursor, version)?;
277 let options = SchemaChangeOptions::from_cursor_and_target(cursor, &target)?;
278
279 Ok(SchemaChange {
280 change_type,
281 target,
282 options,
283 })
284 }
285}
286
287#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Display)]
289#[non_exhaustive]
290pub enum SchemaChangeType {
291 Created,
292 Updated,
293 Dropped,
294}
295
296impl Serialize for SchemaChangeType {
297 fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
298 match self {
299 SchemaChangeType::Created => serialize_str(cursor, CREATED, version),
300 SchemaChangeType::Updated => serialize_str(cursor, UPDATED, version),
301 SchemaChangeType::Dropped => serialize_str(cursor, DROPPED, version),
302 }
303 }
304}
305
306impl FromCursor for SchemaChangeType {
307 fn from_cursor(
308 cursor: &mut Cursor<&[u8]>,
309 _version: Version,
310 ) -> error::Result<SchemaChangeType> {
311 from_cursor_str(cursor).and_then(|ct| match ct {
312 CREATED => Ok(SchemaChangeType::Created),
313 UPDATED => Ok(SchemaChangeType::Updated),
314 DROPPED => Ok(SchemaChangeType::Dropped),
315 _ => Err(Error::UnexpectedSchemaChangeType(ct.into())),
316 })
317 }
318}
319
320#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Display)]
322#[non_exhaustive]
323pub enum SchemaChangeTarget {
324 Keyspace,
325 Table,
326 Type,
327 Function,
328 Aggregate,
329}
330
331impl Serialize for SchemaChangeTarget {
332 fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
333 match self {
334 SchemaChangeTarget::Keyspace => serialize_str(cursor, KEYSPACE, version),
335 SchemaChangeTarget::Table => serialize_str(cursor, TABLE, version),
336 SchemaChangeTarget::Type => serialize_str(cursor, TYPE, version),
337 SchemaChangeTarget::Function => serialize_str(cursor, FUNCTION, version),
338 SchemaChangeTarget::Aggregate => serialize_str(cursor, AGGREGATE, version),
339 }
340 }
341}
342
343impl FromCursor for SchemaChangeTarget {
344 fn from_cursor(
345 cursor: &mut Cursor<&[u8]>,
346 _version: Version,
347 ) -> error::Result<SchemaChangeTarget> {
348 from_cursor_str(cursor).and_then(|t| match t {
349 KEYSPACE => Ok(SchemaChangeTarget::Keyspace),
350 TABLE => Ok(SchemaChangeTarget::Table),
351 TYPE => Ok(SchemaChangeTarget::Type),
352 FUNCTION => Ok(SchemaChangeTarget::Function),
353 AGGREGATE => Ok(SchemaChangeTarget::Aggregate),
354 _ => Err(Error::UnexpectedSchemaChangeTarget(t.into())),
355 })
356 }
357}
358
359#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
361#[non_exhaustive]
362pub enum SchemaChangeOptions {
363 Keyspace(String),
365 TableType(String, String),
367 FunctionAggregate(String, String, Vec<String>),
372}
373
374impl Serialize for SchemaChangeOptions {
375 fn serialize(&self, cursor: &mut Cursor<&mut Vec<u8>>, version: Version) {
376 match self {
377 SchemaChangeOptions::Keyspace(ks) => {
378 serialize_str(cursor, ks, version);
379 }
380 SchemaChangeOptions::TableType(ks, t) => {
381 serialize_str(cursor, ks, version);
382 serialize_str(cursor, t, version);
383 }
384 SchemaChangeOptions::FunctionAggregate(ks, fa_name, list) => {
385 serialize_str(cursor, ks, version);
386 serialize_str(cursor, fa_name, version);
387
388 let len = list.len() as CIntShort;
389 len.serialize(cursor, version);
390 list.iter().for_each(|x| serialize_str(cursor, x, version));
391 }
392 }
393 }
394}
395
396impl SchemaChangeOptions {
397 fn from_cursor_and_target(
398 cursor: &mut Cursor<&[u8]>,
399 target: &SchemaChangeTarget,
400 ) -> error::Result<SchemaChangeOptions> {
401 Ok(match *target {
402 SchemaChangeTarget::Keyspace => SchemaChangeOptions::from_cursor_keyspace(cursor)?,
403 SchemaChangeTarget::Table | SchemaChangeTarget::Type => {
404 SchemaChangeOptions::from_cursor_table_type(cursor)?
405 }
406 SchemaChangeTarget::Function | SchemaChangeTarget::Aggregate => {
407 SchemaChangeOptions::from_cursor_function_aggregate(cursor)?
408 }
409 })
410 }
411
412 fn from_cursor_keyspace(cursor: &mut Cursor<&[u8]>) -> error::Result<SchemaChangeOptions> {
413 Ok(SchemaChangeOptions::Keyspace(
414 from_cursor_str(cursor)?.to_string(),
415 ))
416 }
417
418 fn from_cursor_table_type(cursor: &mut Cursor<&[u8]>) -> error::Result<SchemaChangeOptions> {
419 let keyspace = from_cursor_str(cursor)?.to_string();
420 let name = from_cursor_str(cursor)?.to_string();
421 Ok(SchemaChangeOptions::TableType(keyspace, name))
422 }
423
424 fn from_cursor_function_aggregate(
425 cursor: &mut Cursor<&[u8]>,
426 ) -> error::Result<SchemaChangeOptions> {
427 let keyspace = from_cursor_str(cursor)?.to_string();
428 let name = from_cursor_str(cursor)?.to_string();
429 let types = from_cursor_string_list(cursor)?;
430 Ok(SchemaChangeOptions::FunctionAggregate(
431 keyspace, name, types,
432 ))
433 }
434}
435
436#[cfg(test)]
437fn test_encode_decode(bytes: &[u8], expected: ServerEvent) {
438 let mut ks: Cursor<&[u8]> = Cursor::new(bytes);
439 let event = ServerEvent::from_cursor(&mut ks, Version::V4).unwrap();
440 assert_eq!(expected, event);
441
442 let mut buffer = Vec::new();
443 let mut cursor = Cursor::new(&mut buffer);
444 expected.serialize(&mut cursor, Version::V4);
445 assert_eq!(buffer, bytes);
446}
447
448#[cfg(test)]
449mod topology_change_type_test {
450 use super::*;
451 use crate::frame::traits::FromCursor;
452 use std::io::Cursor;
453
454 #[test]
455 fn from_cursor() {
456 let a = &[0, 8, 78, 69, 87, 95, 78, 79, 68, 69];
457 let mut new_node: Cursor<&[u8]> = Cursor::new(a);
458 assert_eq!(
459 TopologyChangeType::from_cursor(&mut new_node, Version::V4).unwrap(),
460 TopologyChangeType::NewNode
461 );
462
463 let b = &[0, 12, 82, 69, 77, 79, 86, 69, 68, 95, 78, 79, 68, 69];
464 let mut removed_node: Cursor<&[u8]> = Cursor::new(b);
465 assert_eq!(
466 TopologyChangeType::from_cursor(&mut removed_node, Version::V4).unwrap(),
467 TopologyChangeType::RemovedNode
468 );
469 }
470
471 #[test]
472 fn serialize() {
473 {
474 let a = &[0, 8, 78, 69, 87, 95, 78, 79, 68, 69];
475 let mut buffer = Vec::new();
476 let mut cursor = Cursor::new(&mut buffer);
477 let new_node = TopologyChangeType::NewNode;
478 new_node.serialize(&mut cursor, Version::V4);
479 assert_eq!(buffer, a);
480 }
481
482 {
483 let b = &[0, 12, 82, 69, 77, 79, 86, 69, 68, 95, 78, 79, 68, 69];
484 let mut buffer = Vec::new();
485 let mut cursor = Cursor::new(&mut buffer);
486 let removed_node = TopologyChangeType::RemovedNode;
487 removed_node.serialize(&mut cursor, Version::V4);
488 assert_eq!(buffer, b);
489 }
490 }
491
492 #[test]
493 #[should_panic]
494 fn from_cursor_wrong() {
495 let a = &[0, 1, 78];
496 let mut wrong: Cursor<&[u8]> = Cursor::new(a);
497 let _ = TopologyChangeType::from_cursor(&mut wrong, Version::V4).unwrap();
498 }
499}
500
501#[cfg(test)]
502mod status_change_type_test {
503 use super::*;
504 use crate::frame::traits::FromCursor;
505 use std::io::Cursor;
506
507 #[test]
508 fn from_cursor() {
509 let a = &[0, 2, 85, 80];
510 let mut up: Cursor<&[u8]> = Cursor::new(a);
511 assert_eq!(
512 StatusChangeType::from_cursor(&mut up, Version::V4).unwrap(),
513 StatusChangeType::Up
514 );
515
516 let b = &[0, 4, 68, 79, 87, 78];
517 let mut down: Cursor<&[u8]> = Cursor::new(b);
518 assert_eq!(
519 StatusChangeType::from_cursor(&mut down, Version::V4).unwrap(),
520 StatusChangeType::Down
521 );
522 }
523
524 #[test]
525 fn serialize() {
526 {
527 let a = &[0, 2, 85, 80];
528 let mut buffer = Vec::new();
529 let mut cursor = Cursor::new(&mut buffer);
530 let up = StatusChangeType::Up;
531 up.serialize(&mut cursor, Version::V4);
532 assert_eq!(buffer, a);
533 }
534
535 {
536 let b = &[0, 4, 68, 79, 87, 78];
537 let mut buffer = Vec::new();
538 let mut cursor = Cursor::new(&mut buffer);
539 let down = StatusChangeType::Down;
540 down.serialize(&mut cursor, Version::V4);
541 assert_eq!(buffer, b);
542 }
543 }
544
545 #[test]
546 fn from_cursor_wrong() {
547 let a = &[0, 1, 78];
548 let mut wrong: Cursor<&[u8]> = Cursor::new(a);
549 let err = StatusChangeType::from_cursor(&mut wrong, Version::V4).unwrap_err();
550
551 assert!(matches!(err, Error::UnexpectedStatusChangeType(_)));
552 }
553}
554
555#[cfg(test)]
556mod schema_change_type_test {
557 use super::*;
558 use crate::frame::traits::FromCursor;
559 use std::io::Cursor;
560
561 #[test]
562 fn from_cursor() {
563 let a = &[0, 7, 67, 82, 69, 65, 84, 69, 68];
564 let mut created: Cursor<&[u8]> = Cursor::new(a);
565 assert_eq!(
566 SchemaChangeType::from_cursor(&mut created, Version::V4).unwrap(),
567 SchemaChangeType::Created
568 );
569
570 let b = &[0, 7, 85, 80, 68, 65, 84, 69, 68];
571 let mut updated: Cursor<&[u8]> = Cursor::new(b);
572 assert_eq!(
573 SchemaChangeType::from_cursor(&mut updated, Version::V4).unwrap(),
574 SchemaChangeType::Updated
575 );
576
577 let c = &[0, 7, 68, 82, 79, 80, 80, 69, 68];
578 let mut dropped: Cursor<&[u8]> = Cursor::new(c);
579 assert_eq!(
580 SchemaChangeType::from_cursor(&mut dropped, Version::V4).unwrap(),
581 SchemaChangeType::Dropped
582 );
583 }
584
585 #[test]
586 fn serialize() {
587 {
588 let a = &[0, 7, 67, 82, 69, 65, 84, 69, 68];
589 let mut buffer = Vec::new();
590 let mut cursor = Cursor::new(&mut buffer);
591 let created = SchemaChangeType::Created;
592 created.serialize(&mut cursor, Version::V4);
593 assert_eq!(buffer, a);
594 }
595 {
596 let b = &[0, 7, 85, 80, 68, 65, 84, 69, 68];
597 let mut buffer = Vec::new();
598 let mut cursor = Cursor::new(&mut buffer);
599 let updated = SchemaChangeType::Updated;
600 updated.serialize(&mut cursor, Version::V4);
601 assert_eq!(buffer, b);
602 }
603
604 {
605 let c = &[0, 7, 68, 82, 79, 80, 80, 69, 68];
606 let mut buffer = Vec::new();
607 let mut cursor = Cursor::new(&mut buffer);
608 let dropped = SchemaChangeType::Dropped;
609 dropped.serialize(&mut cursor, Version::V4);
610 assert_eq!(buffer, c);
611 }
612 }
613
614 #[test]
615 #[should_panic]
616 fn from_cursor_wrong() {
617 let a = &[0, 1, 78];
618 let mut wrong: Cursor<&[u8]> = Cursor::new(a);
619 let _ = SchemaChangeType::from_cursor(&mut wrong, Version::V4).unwrap();
620 }
621}
622
623#[cfg(test)]
624mod schema_change_target_test {
625 use super::*;
626 use crate::frame::traits::FromCursor;
627 use std::io::Cursor;
628
629 #[test]
630 #[allow(clippy::many_single_char_names)]
631 fn schema_change_target() {
632 {
633 let bytes = &[0, 8, 75, 69, 89, 83, 80, 65, 67, 69];
634 let mut keyspace: Cursor<&[u8]> = Cursor::new(bytes);
635 assert_eq!(
636 SchemaChangeTarget::from_cursor(&mut keyspace, Version::V4).unwrap(),
637 SchemaChangeTarget::Keyspace
638 );
639 }
640
641 let b = &[0, 5, 84, 65, 66, 76, 69];
642 let mut table: Cursor<&[u8]> = Cursor::new(b);
643 assert_eq!(
644 SchemaChangeTarget::from_cursor(&mut table, Version::V4).unwrap(),
645 SchemaChangeTarget::Table
646 );
647
648 let c = &[0, 4, 84, 89, 80, 69];
649 let mut _type: Cursor<&[u8]> = Cursor::new(c);
650 assert_eq!(
651 SchemaChangeTarget::from_cursor(&mut _type, Version::V4).unwrap(),
652 SchemaChangeTarget::Type
653 );
654
655 let d = &[0, 8, 70, 85, 78, 67, 84, 73, 79, 78];
656 let mut function: Cursor<&[u8]> = Cursor::new(d);
657 assert_eq!(
658 SchemaChangeTarget::from_cursor(&mut function, Version::V4).unwrap(),
659 SchemaChangeTarget::Function
660 );
661
662 let e = &[0, 9, 65, 71, 71, 82, 69, 71, 65, 84, 69];
663 let mut aggregate: Cursor<&[u8]> = Cursor::new(e);
664 assert_eq!(
665 SchemaChangeTarget::from_cursor(&mut aggregate, Version::V4).unwrap(),
666 SchemaChangeTarget::Aggregate
667 );
668 }
669
670 #[test]
671 fn serialize() {
672 {
673 let a = &[0, 8, 75, 69, 89, 83, 80, 65, 67, 69];
674 let mut buffer = Vec::new();
675 let mut cursor = Cursor::new(&mut buffer);
676 let keyspace = SchemaChangeTarget::Keyspace;
677 keyspace.serialize(&mut cursor, Version::V4);
678 assert_eq!(buffer, a);
679 }
680
681 {
682 let b = &[0, 5, 84, 65, 66, 76, 69];
683 let mut buffer = Vec::new();
684 let mut cursor = Cursor::new(&mut buffer);
685 let table = SchemaChangeTarget::Table;
686 table.serialize(&mut cursor, Version::V4);
687 assert_eq!(buffer, b);
688 }
689
690 {
691 let c = &[0, 4, 84, 89, 80, 69];
692 let mut buffer = Vec::new();
693 let mut cursor = Cursor::new(&mut buffer);
694 let target_type = SchemaChangeTarget::Type;
695 target_type.serialize(&mut cursor, Version::V4);
696 assert_eq!(buffer, c);
697 }
698
699 {
700 let d = &[0, 8, 70, 85, 78, 67, 84, 73, 79, 78];
701 let mut buffer = Vec::new();
702 let mut cursor = Cursor::new(&mut buffer);
703 let function = SchemaChangeTarget::Function;
704 function.serialize(&mut cursor, Version::V4);
705 assert_eq!(buffer, d);
706 }
707
708 {
709 let e = &[0, 9, 65, 71, 71, 82, 69, 71, 65, 84, 69];
710 let mut buffer = Vec::new();
711 let mut cursor = Cursor::new(&mut buffer);
712 let aggregate = SchemaChangeTarget::Aggregate;
713 aggregate.serialize(&mut cursor, Version::V4);
714 assert_eq!(buffer, e);
715 }
716 }
717
718 #[test]
719 #[should_panic]
720 fn from_cursor_wrong() {
721 let a = &[0, 1, 78];
722 let mut wrong: Cursor<&[u8]> = Cursor::new(a);
723 let _ = SchemaChangeTarget::from_cursor(&mut wrong, Version::V4).unwrap();
724 }
725}
726
727#[cfg(test)]
728mod server_event {
729 use super::*;
730
731 #[test]
732 fn topology_change_new_node() {
733 let bytes = &[
734 0, 15, 84, 79, 80, 79, 76, 79, 71, 89, 95, 67, 72, 65, 78, 71, 69, 0, 8, 78, 69, 87, 95, 78, 79, 68, 69, 4, 127, 0, 0, 1, 0, 0, 0, 1, ];
739
740 let expected = ServerEvent::TopologyChange(TopologyChange {
741 change_type: TopologyChangeType::NewNode,
742 addr: "127.0.0.1:1".parse().unwrap(),
743 });
744
745 test_encode_decode(bytes, expected);
746 }
747
748 #[test]
749 fn topology_change_removed_node() {
750 let bytes = &[
751 0, 15, 84, 79, 80, 79, 76, 79, 71, 89, 95, 67, 72, 65, 78, 71, 69,
753 0, 12, 82, 69, 77, 79, 86, 69, 68, 95, 78, 79, 68, 69, 4, 127, 0, 0, 1, 0, 0, 0, 1, ];
757
758 let expected = ServerEvent::TopologyChange(TopologyChange {
759 change_type: TopologyChangeType::RemovedNode,
760 addr: "127.0.0.1:1".parse().unwrap(),
761 });
762
763 test_encode_decode(bytes, expected);
764 }
765
766 #[test]
767 fn status_change_up() {
768 let bytes = &[
769 0, 13, 83, 84, 65, 84, 85, 83, 95, 67, 72, 65, 78, 71, 69, 0, 2, 85, 80, 4, 127, 0, 0, 1, 0, 0, 0, 1, ];
774
775 let expected = ServerEvent::StatusChange(StatusChange {
776 change_type: StatusChangeType::Up,
777 addr: "127.0.0.1:1".parse().unwrap(),
778 });
779
780 test_encode_decode(bytes, expected);
781 }
782
783 #[test]
784 fn status_change_down() {
785 let bytes = &[
786 0, 13, 83, 84, 65, 84, 85, 83, 95, 67, 72, 65, 78, 71, 69, 0, 4, 68, 79, 87, 78, 4, 127, 0, 0, 1, 0, 0, 0, 1, ];
791
792 let expected = ServerEvent::StatusChange(StatusChange {
793 change_type: StatusChangeType::Down,
794 addr: "127.0.0.1:1".parse().unwrap(),
795 });
796
797 test_encode_decode(bytes, expected);
798 }
799
800 #[test]
801 fn schema_change_created() {
802 {
804 let bytes = &[
805 0, 13, 83, 67, 72, 69, 77, 65, 95, 67, 72, 65, 78, 71, 69, 0, 7, 67, 82, 69, 65, 84, 69, 68, 0, 8, 75, 69, 89, 83, 80, 65, 67, 69, 0, 5, 109, 121, 95, 107, 115,
810 ];
811 let expected = ServerEvent::SchemaChange(SchemaChange {
812 change_type: SchemaChangeType::Created,
813 target: SchemaChangeTarget::Keyspace,
814 options: SchemaChangeOptions::Keyspace("my_ks".to_string()),
815 });
816
817 test_encode_decode(bytes, expected);
818 }
819
820 {
822 let bytes = &[
823 0, 13, 83, 67, 72, 69, 77, 65, 95, 67, 72, 65, 78, 71, 69, 0, 7, 67, 82, 69, 65, 84, 69, 68, 0, 5, 84, 65, 66, 76, 69, 0, 5, 109, 121, 95, 107, 115, 0, 8, 109, 121, 95, 116, 97, 98, 108, 101,
829 ];
830 let expected = ServerEvent::SchemaChange(SchemaChange {
831 change_type: SchemaChangeType::Created,
832 target: SchemaChangeTarget::Table,
833 options: SchemaChangeOptions::TableType(
834 "my_ks".to_string(),
835 "my_table".to_string(),
836 ),
837 });
838 test_encode_decode(bytes, expected);
839 }
840
841 {
843 let bytes = &[
844 0, 13, 83, 67, 72, 69, 77, 65, 95, 67, 72, 65, 78, 71, 69, 0, 7, 67, 82, 69, 65, 84, 69, 68, 0, 4, 84, 89, 80, 69, 0, 5, 109, 121, 95, 107, 115, 0, 8, 109, 121, 95, 116, 97, 98, 108, 101,
850 ];
851 let expected = ServerEvent::SchemaChange(SchemaChange {
852 change_type: SchemaChangeType::Created,
853 target: SchemaChangeTarget::Type,
854 options: SchemaChangeOptions::TableType(
855 "my_ks".to_string(),
856 "my_table".to_string(),
857 ),
858 });
859 test_encode_decode(bytes, expected);
860 }
861
862 {
863 let bytes = &[
865 0, 13, 83, 67, 72, 69, 77, 65, 95, 67, 72, 65, 78, 71, 69, 0, 7, 67, 82, 69, 65, 84, 69, 68, 0, 8, 70, 85, 78, 67, 84, 73, 79, 78, 0, 5, 109, 121, 95, 107, 115, 0, 4, 110, 97, 109, 101, 0, 0,
872 ];
873 let expected = ServerEvent::SchemaChange(SchemaChange {
874 change_type: SchemaChangeType::Created,
875 target: SchemaChangeTarget::Function,
876 options: SchemaChangeOptions::FunctionAggregate(
877 "my_ks".to_string(),
878 "name".to_string(),
879 Vec::new(),
880 ),
881 });
882 test_encode_decode(bytes, expected);
883 }
884
885 {
886 let bytes = &[
888 0, 13, 83, 67, 72, 69, 77, 65, 95, 67, 72, 65, 78, 71, 69, 0, 7, 67, 82, 69, 65, 84, 69, 68, 0, 9, 65, 71, 71, 82, 69, 71, 65, 84, 69, 0, 5, 109, 121, 95, 107, 115, 0, 4, 110, 97, 109, 101, 0, 0,
895 ];
896 let expected = ServerEvent::SchemaChange(SchemaChange {
897 change_type: SchemaChangeType::Created,
898 target: SchemaChangeTarget::Aggregate,
899 options: SchemaChangeOptions::FunctionAggregate(
900 "my_ks".to_string(),
901 "name".to_string(),
902 Vec::new(),
903 ),
904 });
905 test_encode_decode(bytes, expected);
906 }
907 }
908
909 #[test]
910 fn schema_change_updated() {
911 {
913 let bytes = &[
914 0, 13, 83, 67, 72, 69, 77, 65, 95, 67, 72, 65, 78, 71, 69, 0, 7, 85, 80, 68, 65, 84, 69, 68, 0, 8, 75, 69, 89, 83, 80, 65, 67, 69, 0, 5, 109, 121, 95, 107, 115,
919 ];
920 let expected = ServerEvent::SchemaChange(SchemaChange {
921 change_type: SchemaChangeType::Updated,
922 target: SchemaChangeTarget::Keyspace,
923 options: SchemaChangeOptions::Keyspace("my_ks".to_string()),
924 });
925 test_encode_decode(bytes, expected);
926 }
927
928 {
930 let bytes = &[
931 0, 13, 83, 67, 72, 69, 77, 65, 95, 67, 72, 65, 78, 71, 69, 0, 7, 85, 80, 68, 65, 84, 69, 68, 0, 5, 84, 65, 66, 76, 69, 0, 5, 109, 121, 95, 107, 115, 0, 8, 109, 121, 95, 116, 97, 98, 108, 101,
937 ];
938 let expected = ServerEvent::SchemaChange(SchemaChange {
939 change_type: SchemaChangeType::Updated,
940 target: SchemaChangeTarget::Table,
941 options: SchemaChangeOptions::TableType(
942 "my_ks".to_string(),
943 "my_table".to_string(),
944 ),
945 });
946 test_encode_decode(bytes, expected);
947 }
948
949 {
951 let bytes = &[
952 0, 13, 83, 67, 72, 69, 77, 65, 95, 67, 72, 65, 78, 71, 69, 0, 7, 85, 80, 68, 65, 84, 69, 68, 0, 4, 84, 89, 80, 69, 0, 5, 109, 121, 95, 107, 115, 0, 8, 109, 121, 95, 116, 97, 98, 108, 101,
958 ];
959 let expected = ServerEvent::SchemaChange(SchemaChange {
960 change_type: SchemaChangeType::Updated,
961 target: SchemaChangeTarget::Type,
962 options: SchemaChangeOptions::TableType(
963 "my_ks".to_string(),
964 "my_table".to_string(),
965 ),
966 });
967 test_encode_decode(bytes, expected);
968 }
969
970 {
972 let bytes = &[
973 0, 13, 83, 67, 72, 69, 77, 65, 95, 67, 72, 65, 78, 71, 69, 0, 7, 85, 80, 68, 65, 84, 69, 68, 0, 8, 70, 85, 78, 67, 84, 73, 79, 78, 0, 5, 109, 121, 95, 107, 115, 0, 4, 110, 97, 109, 101, 0, 0,
980 ];
981 let expected = ServerEvent::SchemaChange(SchemaChange {
982 change_type: SchemaChangeType::Updated,
983 target: SchemaChangeTarget::Function,
984 options: SchemaChangeOptions::FunctionAggregate(
985 "my_ks".to_string(),
986 "name".to_string(),
987 Vec::new(),
988 ),
989 });
990 test_encode_decode(bytes, expected);
991 }
992
993 {
995 let bytes = &[
996 0, 13, 83, 67, 72, 69, 77, 65, 95, 67, 72, 65, 78, 71, 69, 0, 7, 85, 80, 68, 65, 84, 69, 68, 0, 9, 65, 71, 71, 82, 69, 71, 65, 84, 69, 0, 5, 109, 121, 95, 107, 115, 0, 4, 110, 97, 109, 101, 0, 0,
1003 ];
1004 let expected = ServerEvent::SchemaChange(SchemaChange {
1005 change_type: SchemaChangeType::Updated,
1006 target: SchemaChangeTarget::Aggregate,
1007 options: SchemaChangeOptions::FunctionAggregate(
1008 "my_ks".to_string(),
1009 "name".to_string(),
1010 Vec::new(),
1011 ),
1012 });
1013 test_encode_decode(bytes, expected);
1014 }
1015 }
1016
1017 #[test]
1018 fn schema_change_dropped() {
1019 {
1021 let bytes = &[
1022 0, 13, 83, 67, 72, 69, 77, 65, 95, 67, 72, 65, 78, 71, 69, 0, 7, 68, 82, 79, 80, 80, 69, 68, 0, 8, 75, 69, 89, 83, 80, 65, 67, 69, 0, 5, 109, 121, 95, 107, 115,
1027 ];
1028 let expected = ServerEvent::SchemaChange(SchemaChange {
1029 change_type: SchemaChangeType::Dropped,
1030 target: SchemaChangeTarget::Keyspace,
1031 options: SchemaChangeOptions::Keyspace("my_ks".to_string()),
1032 });
1033 test_encode_decode(bytes, expected);
1034 }
1035
1036 {
1038 let bytes = &[
1039 0, 13, 83, 67, 72, 69, 77, 65, 95, 67, 72, 65, 78, 71, 69, 0, 7, 68, 82, 79, 80, 80, 69, 68, 0, 5, 84, 65, 66, 76, 69, 0, 5, 109, 121, 95, 107, 115, 0, 8, 109, 121, 95, 116, 97, 98, 108, 101,
1045 ];
1046 let expected = ServerEvent::SchemaChange(SchemaChange {
1047 change_type: SchemaChangeType::Dropped,
1048 target: SchemaChangeTarget::Table,
1049 options: SchemaChangeOptions::TableType(
1050 "my_ks".to_string(),
1051 "my_table".to_string(),
1052 ),
1053 });
1054 test_encode_decode(bytes, expected);
1055 }
1056
1057 {
1059 let bytes = &[
1060 0, 13, 83, 67, 72, 69, 77, 65, 95, 67, 72, 65, 78, 71, 69, 0, 7, 68, 82, 79, 80, 80, 69, 68, 0, 4, 84, 89, 80, 69, 0, 5, 109, 121, 95, 107, 115, 0, 8, 109, 121, 95, 116, 97, 98, 108, 101,
1066 ];
1067 let expected = ServerEvent::SchemaChange(SchemaChange {
1068 change_type: SchemaChangeType::Dropped,
1069 target: SchemaChangeTarget::Type,
1070 options: SchemaChangeOptions::TableType(
1071 "my_ks".to_string(),
1072 "my_table".to_string(),
1073 ),
1074 });
1075 test_encode_decode(bytes, expected);
1076 }
1077
1078 {
1080 let bytes = &[
1081 0, 13, 83, 67, 72, 69, 77, 65, 95, 67, 72, 65, 78, 71, 69, 0, 7, 68, 82, 79, 80, 80, 69, 68, 0, 8, 70, 85, 78, 67, 84, 73, 79, 78, 0, 5, 109, 121, 95, 107, 115, 0, 4, 110, 97, 109, 101, 0, 0,
1088 ];
1089 let expected = ServerEvent::SchemaChange(SchemaChange {
1090 change_type: SchemaChangeType::Dropped,
1091 target: SchemaChangeTarget::Function,
1092 options: SchemaChangeOptions::FunctionAggregate(
1093 "my_ks".to_string(),
1094 "name".to_string(),
1095 Vec::new(),
1096 ),
1097 });
1098 test_encode_decode(bytes, expected);
1099 }
1100
1101 {
1103 let bytes = &[
1104 0, 13, 83, 67, 72, 69, 77, 65, 95, 67, 72, 65, 78, 71, 69, 0, 7, 68, 82, 79, 80, 80, 69, 68, 0, 9, 65, 71, 71, 82, 69, 71, 65, 84, 69, 0, 5, 109, 121, 95, 107, 115, 0, 4, 110, 97, 109, 101, 0, 0,
1111 ];
1112 let expected = ServerEvent::SchemaChange(SchemaChange {
1113 change_type: SchemaChangeType::Dropped,
1114 target: SchemaChangeTarget::Aggregate,
1115 options: SchemaChangeOptions::FunctionAggregate(
1116 "my_ks".to_string(),
1117 "name".to_string(),
1118 Vec::new(),
1119 ),
1120 });
1121 test_encode_decode(bytes, expected);
1122 }
1123 }
1124}