1#![deny(
11 clippy::all,
12 clippy::cargo,
13 clippy::else_if_without_else,
14 clippy::empty_line_after_outer_attr,
15 clippy::multiple_inherent_impl,
16 clippy::mut_mut,
17 clippy::path_buf_push_overwrite
18)]
19#![warn(
20 clippy::cargo_common_metadata,
21 clippy::mutex_integer,
22 clippy::needless_borrow,
23 clippy::similar_names
24)]
25#![allow(clippy::multiple_crate_versions)]
26
27pub mod naming_utils;
28
29use crate::naming_utils::NameUtils;
30use derive_more::{Display, From};
31use encoding_rs::mem;
32use im::HashMap as ImHashMap;
33use im::OrdMap;
34use lazy_static::lazy_static;
35use murmurhash3::murmurhash3_x64_128;
36use ordered_float::OrderedFloat;
37use regex::Regex;
38use serde::{Deserialize, Serialize};
39use snafu::Snafu;
40use std::cmp::{min, Reverse};
41use std::collections::{BTreeMap, HashMap};
42use std::convert::From;
43use std::fmt;
44use std::fmt::{Debug, Write};
45use std::fmt::{Display, Formatter};
46use std::net::{SocketAddr, ToSocketAddrs};
47use std::ops::Index;
48use std::vec;
49use tokio_rustls::rustls;
50use tokio_rustls::rustls::{RootCertStore, ServerCertVerified, ServerCertVerifier, TLSError};
51use tokio_rustls::webpki::DNSNameRef;
52use uuid::Uuid;
53
54#[macro_use]
55extern crate shrinkwraprs;
56
57#[macro_use]
58extern crate derive_new;
59
60#[macro_use]
61extern crate num_derive;
62
63type PravegaNodeUriParseResult<T> = std::result::Result<T, PravegaNodeUriParseError>;
64
65#[derive(Debug, Snafu)]
66pub enum PravegaNodeUriParseError {
67 #[snafu(display("Could not parse uri due to {}", error_msg))]
68 ParseError { error_msg: String },
69}
70
71#[derive(Debug, PartialEq, Default)]
72struct PravegaNodeUriParts {
73 scheme: Option<String>,
74 domain_name: Option<String>,
75 port: Option<u16>,
76}
77
78#[derive(From, Shrinkwrap, Debug, Clone, Hash, PartialEq, Eq)]
79pub struct PravegaNodeUri(pub String);
80
81impl From<&str> for PravegaNodeUri {
82 fn from(endpoint: &str) -> Self {
83 PravegaNodeUri(endpoint.to_owned())
84 }
85}
86
87impl From<(&str, u16)> for PravegaNodeUri {
88 fn from(tuple: (&str, u16)) -> Self {
89 let endpoint = format!("{}:{}", tuple.0, tuple.1);
90 PravegaNodeUri(endpoint)
91 }
92}
93
94impl From<SocketAddr> for PravegaNodeUri {
95 fn from(socket_addr: SocketAddr) -> Self {
96 PravegaNodeUri(socket_addr.to_string())
97 }
98}
99
100impl PravegaNodeUri {
101 pub fn to_socket_addr(&self) -> SocketAddr {
102 match PravegaNodeUri::uri_parts_from_string(self.to_string()) {
104 Ok(uri_parts) => {
105 let mut addrs_vec: Vec<_> =
106 format!("{}:{}", uri_parts.domain_name.unwrap(), uri_parts.port.unwrap())
107 .to_socket_addrs()
108 .expect("Unable to resolve domain")
109 .collect();
110 addrs_vec.pop().expect("get the first SocketAddr")
111 }
112 Err(e) => panic!("{}", e),
113 }
114 }
115
116 pub fn domain_name(&self) -> String {
117 match PravegaNodeUri::uri_parts_from_string(self.to_string()) {
118 Ok(uri_parts) => uri_parts.domain_name.expect("uri missing domain name"),
119 Err(e) => panic!("{}", e),
120 }
121 }
122
123 pub fn port(&self) -> u16 {
124 match PravegaNodeUri::uri_parts_from_string(self.to_string()) {
125 Ok(uri_parts) => uri_parts.port.expect("parse port to u16"),
126 Err(e) => panic!("{}", e),
127 }
128 }
129
130 pub fn scheme(&self) -> PravegaNodeUriParseResult<String> {
132 match PravegaNodeUri::uri_parts_from_string(self.to_string()) {
133 Ok(sa) => match sa.scheme {
134 Some(scheme) => Ok(scheme),
135 _ => Ok("".to_string()),
136 },
137 Err(e) => Err(e),
138 }
139 }
140
141 pub fn is_well_formed(uri: String) -> bool {
144 match PravegaNodeUri::uri_parts_from_string(uri) {
145 Ok(uri_parts) => uri_parts.port.is_some() && uri_parts.domain_name.is_some(),
146 Err(_) => false,
147 }
148 }
149
150 fn uri_parts_from_string(uri: String) -> PravegaNodeUriParseResult<PravegaNodeUriParts> {
151 lazy_static! {
152 static ref URI_RE: Regex = Regex::new(
153 r"(?x)
154 (?:(?P<scheme>[[:alnum:]]+)://)?
155 (?P<domain_name>([0-9A-Za-z\-\.]+|\[[0-9A-F\.:]+\]))
156 :
157 (?P<port>[[:digit:]]+)"
158 )
159 .unwrap();
160 }
161 let mut uri_parts: PravegaNodeUriParts = PravegaNodeUriParts::default();
162
163 let first_endpoint = match URI_RE.captures_iter(&uri).next() {
171 Some(endpoint) => endpoint,
172 _ => {
173 return Err(PravegaNodeUriParseError::ParseError {
174 error_msg: format!("malformed uri {}", uri),
175 })
176 }
177 };
178
179 uri_parts.domain_name = Some(first_endpoint.name("domain_name").unwrap().as_str().to_string());
180 uri_parts.port = Some(
181 first_endpoint
182 .name("port")
183 .unwrap()
184 .as_str()
185 .parse::<u16>()
186 .expect("port not a valid u16"),
187 );
188 uri_parts.scheme = first_endpoint
189 .name("scheme")
190 .map(|scheme| scheme.as_str().to_string());
191 Ok(uri_parts)
192 }
193}
194
195#[derive(new, Debug, Clone, Hash, PartialEq, Eq)]
196pub struct DelegationToken {
197 value: String,
198 expiry_time: Option<u64>,
199}
200
201impl DelegationToken {
202 pub fn get_value(&self) -> String {
203 self.value.clone()
204 }
205
206 pub fn get_expiry_time(&self) -> Option<u64> {
207 self.expiry_time
208 }
209}
210
211#[derive(From, Shrinkwrap, Debug, Clone, Hash, PartialEq, Eq)]
212pub struct Timestamp(pub u64);
213
214#[derive(From, Shrinkwrap, Debug, Display, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
215pub struct Scope {
216 pub name: String,
217}
218
219#[derive(From, Shrinkwrap, Debug, Display, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
220pub struct Stream {
221 pub name: String,
222}
223
224#[derive(From, Shrinkwrap, Debug, Display, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
225pub struct Reader {
226 pub name: String,
227}
228
229#[derive(Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
230pub struct Segment {
231 pub number: i64,
232 pub tx_id: Option<TxId>,
233}
234
235impl Segment {
236 pub fn from(number: i64) -> Self {
237 Segment { number, tx_id: None }
238 }
239
240 pub fn from_id_and_epoch(segment_id: i32, epoch: i32) -> Self {
241 let epoch_i64 = (epoch as i64) << 32;
242 let id_i64 = segment_id as i64;
243 Segment {
244 number: epoch_i64 + id_i64,
245 tx_id: None,
246 }
247 }
248
249 pub fn from_txn(number: i64, tx_id: TxId) -> Self {
250 Segment {
251 number,
252 tx_id: Some(tx_id),
253 }
254 }
255
256 pub fn is_transaction_segment(&self) -> bool {
257 self.tx_id.is_some()
258 }
259
260 pub fn get_epoch(&self) -> i32 {
261 (self.number >> 32) as i32
262 }
263
264 pub fn get_segment_number(&self) -> i32 {
265 self.number as i32
266 }
267}
268
269impl fmt::Debug for Segment {
270 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
271 f.debug_struct("Segment")
272 .field("segment", &self.get_segment_number())
273 .field("epoch", &self.get_epoch())
274 .finish()
275 }
276}
277
278#[derive(new, Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
279pub struct ScopedStream {
280 pub scope: Scope,
281 pub stream: Stream,
282}
283
284impl From<&ScopedSegment> for ScopedStream {
285 fn from(scoped_segment: &ScopedSegment) -> Self {
286 ScopedStream {
287 scope: scoped_segment.scope.clone(),
288 stream: scoped_segment.stream.clone(),
289 }
290 }
291}
292
293impl From<&str> for ScopedStream {
294 fn from(string: &str) -> Self {
295 let buf = string.split('/').collect::<Vec<&str>>();
296 ScopedStream {
297 scope: Scope {
298 name: buf[0].to_string(),
299 },
300 stream: Stream {
301 name: buf[1].to_string(),
302 },
303 }
304 }
305}
306
307#[derive(new, Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
312pub struct CToken {
313 pub token: String,
314}
315
316impl CToken {
317 pub fn empty() -> CToken {
318 CToken {
319 token: String::from(""),
320 }
321 }
322}
323
324impl From<&str> for CToken {
325 fn from(string: &str) -> Self {
326 CToken {
327 token: string.to_string(),
328 }
329 }
330}
331
332#[derive(new, Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
333pub struct ScopedSegment {
334 pub scope: Scope,
335 pub stream: Stream,
336 pub segment: Segment,
337}
338
339impl ScopedSegment {
340 pub fn get_scoped_stream(&self) -> ScopedStream {
341 ScopedStream::new(self.scope.clone(), self.stream.clone())
342 }
343}
344
345impl From<&str> for ScopedSegment {
346 fn from(qualified_name: &str) -> Self {
347 if NameUtils::is_transaction_segment(qualified_name) {
348 let original_segment_name = NameUtils::get_parent_stream_segment_name(qualified_name);
349 ScopedSegment::from(original_segment_name)
350 } else {
351 let mut tokens = NameUtils::extract_segment_tokens(qualified_name.to_owned());
352 let segment_id = tokens.pop().expect("get segment id from tokens");
353 let stream_name = tokens.pop().expect("get stream name from tokens");
354
355 if tokens.is_empty() {
356 ScopedSegment {
358 scope: Scope {
359 name: String::from(""),
360 },
361 stream: Stream { name: stream_name },
362 segment: Segment {
363 number: segment_id.parse::<i64>().expect("parse string to i64"),
364 tx_id: None,
365 },
366 }
367 } else {
368 let scope = tokens.pop().expect("get scope from tokens");
369 ScopedSegment {
370 scope: Scope { name: scope },
371 stream: Stream { name: stream_name },
372 segment: Segment {
373 number: segment_id.parse::<i64>().expect("parse string to i64"),
374 tx_id: None,
375 },
376 }
377 }
378 }
379 }
380}
381
382#[derive(From, Shrinkwrap, Copy, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
383pub struct TxId(pub u128);
384
385impl TxId {
386 pub fn get_epoch(&self) -> i32 {
390 (self.0 >> 96) as i32
391 }
392}
393
394impl Display for TxId {
395 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
396 f.write_str(Uuid::from_u128(self.0).to_hyphenated().to_string().as_str())?;
397 Ok(())
398 }
399}
400
401impl fmt::Debug for TxId {
402 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
403 f.write_str(Uuid::from_u128(self.0).to_hyphenated().to_string().as_str())?;
404 Ok(())
405 }
406}
407
408impl Display for ScopedStream {
409 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
410 f.write_str(&self.scope.name)?;
411 f.write_char('/')?;
412 f.write_str(&self.stream.name)?;
413 Ok(())
414 }
415}
416
417impl Display for ScopedSegment {
418 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
419 f.write_str(&NameUtils::get_qualified_stream_segment_name(
420 &self.scope.name,
421 &self.stream.name,
422 self.segment.number,
423 self.segment.tx_id,
424 ))?;
425 Ok(())
426 }
427}
428
429#[derive(From, Shrinkwrap, Copy, Clone, Hash, PartialEq, Eq)]
430pub struct WriterId(pub u128);
431
432impl Debug for WriterId {
433 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
434 write!(f, "{:x}", self.0)
435 }
436}
437
438impl Display for WriterId {
439 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
440 write!(f, "{:x}", self.0)
441 }
442}
443
444#[derive(Debug, Clone, Hash, PartialEq, Eq, FromPrimitive)]
445pub enum ScaleType {
446 FixedNumSegments = 0,
447 ByRateInKbytesPerSec = 1,
448 ByRateInEventsPerSec = 2,
449}
450
451#[derive(Debug, Clone, Hash, PartialEq, Eq)]
452pub struct Scaling {
453 pub scale_type: ScaleType,
454 pub target_rate: i32,
455 pub scale_factor: i32,
456 pub min_num_segments: i32,
457}
458
459impl Default for Scaling {
460 fn default() -> Self {
461 Scaling {
462 scale_type: ScaleType::FixedNumSegments,
463 min_num_segments: 1,
464 scale_factor: 1,
465 target_rate: 0,
466 }
467 }
468}
469
470#[derive(Debug, Clone, Hash, PartialEq, Eq, FromPrimitive)]
471pub enum RetentionType {
472 None = 0,
473 Time = 1,
474 Size = 2,
475}
476
477#[derive(Debug, Clone, Hash, PartialEq, Eq)]
478pub enum PingStatus {
479 Ok = 0,
480 Committed = 1,
481 Aborted = 2,
482}
483
484#[derive(Debug, Clone, Hash, PartialEq, Eq)]
485pub enum TransactionStatus {
486 Open = 0,
487 Committing = 1,
488 Committed = 2,
489 Aborting = 3,
490 Aborted = 4,
491}
492
493#[derive(Debug, Clone, Hash, PartialEq, Eq)]
494pub struct Retention {
495 pub retention_type: RetentionType,
496 pub retention_param: i64,
497}
498
499impl Default for Retention {
500 fn default() -> Self {
501 Retention {
502 retention_type: RetentionType::None,
503 retention_param: 0,
504 }
505 }
506}
507
508#[derive(new, Debug, Clone, Hash, PartialEq, Eq)]
509pub struct StreamConfiguration {
510 pub scoped_stream: ScopedStream,
511 pub scaling: Scaling,
512 pub retention: Retention,
513 pub tags: Option<Vec<String>>,
514}
515
516#[derive(new, Debug, Clone)]
517pub struct StreamCut {
518 pub scoped_stream: ScopedStream,
519 pub segment_offset_map: HashMap<i64, i64>,
520}
521
522const PREFIX_LENGTH: usize = 2;
523
524#[derive(new, Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)]
525pub struct SegmentWithRange {
526 pub scoped_segment: ScopedSegment,
527 pub min_key: OrderedFloat<f64>,
528 pub max_key: OrderedFloat<f64>,
529}
530
531impl SegmentWithRange {
532 pub fn get_segment(&self) -> Segment {
533 self.scoped_segment.segment.clone()
534 }
535}
536
537impl fmt::Display for SegmentWithRange {
538 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
539 let segment_str = self.scoped_segment.to_string();
540 write!(
541 f,
542 "{:02}{}{}-{}",
543 segment_str.len(),
544 segment_str,
545 self.min_key,
546 self.max_key
547 )
548 }
549}
550
551impl From<&str> for SegmentWithRange {
552 fn from(name: &str) -> Self {
553 let segment_name_length: usize = name[..PREFIX_LENGTH].parse().expect("parse prefix length");
554
555 let segment_str = &*name[PREFIX_LENGTH..PREFIX_LENGTH + segment_name_length]
556 .parse::<String>()
557 .expect("parse segment name");
558
559 let scoped_segment: ScopedSegment = segment_str.into();
560
561 let rest_string = name[PREFIX_LENGTH + segment_name_length..]
562 .parse::<String>()
563 .expect("parse segment name");
564
565 let mut parts: Vec<&str> = rest_string.split('-').collect();
566 let max_key = parts
567 .pop()
568 .expect("get max key")
569 .parse::<OrderedFloat<f64>>()
570 .expect("parse OrderedFloat from str");
571 let min_key = parts
572 .pop()
573 .expect("get max key")
574 .parse::<OrderedFloat<f64>>()
575 .expect("parse OrderedFloat from str");
576
577 SegmentWithRange {
578 scoped_segment,
579 min_key,
580 max_key,
581 }
582 }
583}
584
585#[derive(Debug, Clone, Hash, PartialEq, Eq)]
586pub struct StreamSegments {
587 pub key_segment_map: OrdMap<OrderedFloat<f64>, SegmentWithRange>,
588}
589
590impl StreamSegments {
591 const SEED: u64 = 1741865571; pub fn new(map_key_segment: BTreeMap<OrderedFloat<f64>, SegmentWithRange>) -> StreamSegments {
594 StreamSegments::assert_valid(&map_key_segment);
595 StreamSegments {
596 key_segment_map: map_key_segment.into(),
597 }
598 }
599
600 fn assert_valid(map: &BTreeMap<OrderedFloat<f64>, SegmentWithRange>) {
601 if !map.is_empty() {
602 let (min_key, _min_seg) = map.iter().next().expect("Error reading min key");
603 let (max_key, _max_seg) = map.iter().next_back().expect("Error read max key");
604 assert!(
605 min_key.gt(&OrderedFloat(0.0)),
606 "Min key is expected to be greater than 0.0"
607 );
608 assert!(max_key.ge(&OrderedFloat(1.0)), "Last Key is missing");
609 assert!(
610 max_key.lt(&OrderedFloat(1.0001)),
611 "Segments should have values only up to 1.0"
612 );
613 }
614 }
615
616 pub fn get_segment_for_routing_key(
618 &self,
619 routing_key: &Option<String>,
620 rand_f64: fn() -> f64,
621 ) -> &ScopedSegment {
622 if let Some(key) = routing_key {
623 self.get_segment_for_string(key)
624 } else {
625 self.get_segment(rand_f64())
626 }
627 }
628
629 pub fn get_segment(&self, key: f64) -> &ScopedSegment {
630 assert!(OrderedFloat(key).ge(&OrderedFloat(0.0)), "Key should be >= 0.0");
631 assert!(OrderedFloat(key).le(&OrderedFloat(1.0)), "Key should be <= 1.0");
632 let r = self
633 .key_segment_map
634 .get_next(&OrderedFloat(key))
635 .expect("No matching segment found for the given key");
636 &r.1.scoped_segment
637 }
638
639 pub fn get_segment_for_string(&self, str: &str) -> &ScopedSegment {
640 let mut buffer_u16 = vec![0; str.len()];
641
642 mem::convert_str_to_utf16(str, &mut buffer_u16);
644
645 let (prefix, buffer_u8, suffix) = unsafe { buffer_u16.align_to::<u8>() };
647 assert!(prefix.is_empty());
648 assert!(suffix.is_empty());
649
650 let (upper, _lower) = murmurhash3_x64_128(buffer_u8, StreamSegments::SEED);
651
652 let key = u64_to_f64_fraction(upper);
654 self.get_segment(key)
655 }
656
657 pub fn get_segments(&self) -> Vec<ScopedSegment> {
658 self.key_segment_map
659 .values()
660 .map(|v| v.scoped_segment.to_owned())
661 .collect::<Vec<ScopedSegment>>()
662 }
663
664 pub fn apply_replacement_range(
665 &self,
666 segment_replace: &Segment,
667 replacement_ranges: &StreamSegmentsWithPredecessors,
668 ) -> Result<StreamSegments, String> {
669 let mut replaced_ranges = replacement_ranges
670 .replacement_segments
671 .get(segment_replace)
672 .unwrap_or_else(|| panic!("Empty set of replacements"))
673 .clone();
674
675 replaced_ranges.sort_by_key(|k| Reverse(k.max_key));
676 let replaced_ranges_ref = &replaced_ranges;
677 StreamSegments::verify_continuous(replaced_ranges_ref)
678 .expect("Replacement ranges are not continuous");
679
680 let mut result: BTreeMap<OrderedFloat<f64>, SegmentWithRange> = BTreeMap::new();
681 for (key, seg) in self.key_segment_map.iter().rev() {
682 if segment_replace.number == seg.scoped_segment.segment.number {
683 for new_segment in replaced_ranges_ref {
685 let lower_bound = self.key_segment_map.range(..key).next_back();
686 match lower_bound {
687 None => {
688 result.insert(min(new_segment.max_key, *key), new_segment.clone());
689 }
690 Some(lower_bound_value) => {
691 if new_segment.max_key.ge(lower_bound_value.0) {
692 result.insert(min(new_segment.max_key, *key), new_segment.clone());
693 }
694 }
695 };
696 }
697 } else {
698 result.insert(*key, seg.clone());
699 }
700 }
701
702 Ok(StreamSegments::new(result))
703 }
704
705 fn verify_continuous(segment_replace_ranges: &[SegmentWithRange]) -> Result<(), String> {
706 let mut previous = segment_replace_ranges.index(0).max_key;
707 for x in segment_replace_ranges {
708 if x.max_key.0.ne(&previous.0) {
709 return Err("Replacement segments are not continuous".to_string());
710 }
711 previous = x.min_key;
712 }
713 Ok(())
714 }
715}
716
717#[derive(new, Debug, Clone, Hash, PartialEq, Eq)]
718pub struct TxnSegments {
719 pub stream_segments: StreamSegments,
720 pub tx_id: TxId,
721}
722
723#[derive(Debug, Clone, Hash, PartialEq, Eq)]
724pub struct StreamSegmentsWithPredecessors {
725 pub segment_with_predecessors: ImHashMap<SegmentWithRange, Vec<Segment>>,
726 pub replacement_segments: ImHashMap<Segment, Vec<SegmentWithRange>>, }
728
729impl StreamSegmentsWithPredecessors {
730 pub fn new(
731 segment_with_predecessor: ImHashMap<SegmentWithRange, Vec<Segment>>,
732 ) -> StreamSegmentsWithPredecessors {
733 let mut replacement_map: HashMap<Segment, Vec<SegmentWithRange>> = HashMap::new();
734 for (segment, predecessor) in &segment_with_predecessor {
735 for predecessor_segment in predecessor {
736 let predecessor = predecessor_segment.clone();
737 let mut replacement_segments = replacement_map
738 .get(&predecessor)
739 .get_or_insert(&Vec::new())
740 .clone();
741 replacement_segments.push((*segment).clone());
742 replacement_map.insert(predecessor, replacement_segments.to_vec());
743 }
744 }
745 StreamSegmentsWithPredecessors {
746 segment_with_predecessors: segment_with_predecessor,
747 replacement_segments: replacement_map.into(), }
749 }
750
751 pub fn is_stream_sealed(&self) -> bool {
754 self.segment_with_predecessors.is_empty()
755 }
756}
757
758pub(crate) fn u64_to_f64_fraction(hash: u64) -> f64 {
760 let shifted = (hash >> 12) & 0x000f_ffff_ffff_ffff_u64;
761 f64::from_bits(0x3ff0_0000_0000_0000_u64 + shifted) - 1.0
762}
763
764#[derive(new, Debug, Clone, Hash, PartialEq, Eq)]
765pub struct EventRead {
766 pub event: Vec<u8>,
767}
768
769#[derive(new, Debug, Clone, Hash, PartialEq, Eq)]
771pub struct SegmentInfo {
772 pub segment: ScopedSegment,
774
775 pub starting_offset: i64,
780
781 pub write_offset: i64,
784
785 pub is_sealed: bool,
787
788 pub last_modified_time: i64,
790}
791
792pub struct NoVerifier;
794
795impl ServerCertVerifier for NoVerifier {
796 fn verify_server_cert(
797 &self,
798 _roots: &RootCertStore,
799 _presented_certs: &[rustls::Certificate],
800 _dns_name: DNSNameRef,
801 _ocsp_response: &[u8],
802 ) -> Result<ServerCertVerified, TLSError> {
803 Ok(ServerCertVerified::assertion())
804 }
805}
806
807#[cfg(test)]
808mod test {
809 use super::*;
810 use std::convert::From;
811 use std::net::{IpAddr, Ipv4Addr};
812
813 #[test]
814 fn test_hash() {
815 let s = "hello";
816 let mut buffer_u16 = vec![0; s.len()];
817
818 mem::convert_str_to_utf16(s, &mut buffer_u16);
819
820 let (prefix, buffer_u8, suffix) = unsafe { buffer_u16.align_to::<u8>() };
821 assert!(prefix.is_empty());
822 assert!(suffix.is_empty());
823
824 let (upper, _lower) = murmurhash3_x64_128(&buffer_u8, StreamSegments::SEED);
825 assert_eq!(u64_to_f64_fraction(upper), 0.658716230571337);
826 }
827
828 #[test]
829 fn test_segment_with_range() {
830 let segment = SegmentWithRange {
831 scoped_segment: ScopedSegment {
832 scope: Scope::from("scope".to_owned()),
833 stream: Stream::from("stream".to_owned()),
834 segment: Segment::from(0),
835 },
836 min_key: OrderedFloat::from(0.0),
837 max_key: OrderedFloat::from(1.0),
838 };
839
840 let segment_string = &*segment.to_string();
841
842 let segment_from_string: SegmentWithRange = segment_string.into();
843
844 assert_eq!(segment_from_string, segment);
845 }
846
847 #[test]
848 fn test_pravega_node_uri() {
849 let uri = PravegaNodeUri("127.0.0.1:9090".to_string());
850 assert_eq!(PravegaNodeUri::from("127.0.0.1:9090"), uri);
851 assert_eq!(PravegaNodeUri::from(("127.0.0.1", 9090)), uri);
852 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9090);
853 assert_eq!(PravegaNodeUri::from(socket_addr), uri);
854 assert_eq!(uri.domain_name(), "127.0.0.1".to_string());
855 assert_eq!(uri.port(), 9090);
856 assert_eq!(uri.to_socket_addr(), socket_addr);
857
858 let uri = PravegaNodeUri("localhost:9090".to_string());
859 assert_eq!(PravegaNodeUri::from("localhost:9090"), uri);
860 assert_eq!(PravegaNodeUri::from(("localhost", 9090)), uri);
861 assert_eq!(uri.domain_name(), "localhost".to_string());
862 assert_eq!(uri.port(), 9090);
863 assert_eq!(uri.to_socket_addr(), socket_addr);
864
865 assert_eq!(
866 PravegaNodeUri::uri_parts_from_string("127.0.0.1:9090".to_string()).unwrap(),
867 PravegaNodeUriParts {
868 scheme: None,
869 domain_name: Some("127.0.0.1".to_string()),
870 port: Some(9090)
871 }
872 );
873
874 let uri_with_scheme = PravegaNodeUri("tls://127.0.0.1:9090".to_string());
875 assert_eq!(
876 PravegaNodeUri::uri_parts_from_string(uri_with_scheme.to_string()).unwrap(),
877 PravegaNodeUriParts {
878 scheme: Some("tls".to_string()),
879 domain_name: Some("127.0.0.1".to_string()),
880 port: Some(9090),
881 }
882 );
883 assert!(PravegaNodeUri::is_well_formed(uri_with_scheme.to_string()));
884 assert_eq!(uri_with_scheme.port(), 9090);
885
886 let uri_with_scheme =
888 PravegaNodeUri("ssl://127.0.0.1:9090,127.0.0.1:9091,127.0.0.1:9092".to_string());
889 assert_eq!(
890 PravegaNodeUri::uri_parts_from_string(uri_with_scheme.to_string()).unwrap(),
891 PravegaNodeUriParts {
892 scheme: Some("ssl".to_string()),
893 domain_name: Some("127.0.0.1".to_string()),
894 port: Some(9090),
895 }
896 );
897
898 assert!(PravegaNodeUri::uri_parts_from_string("tls://127.0.0.1://9090".into()).is_err());
899 assert!(!PravegaNodeUri::is_well_formed("tls://127.0.0.1://9090".into()));
900 assert!(PravegaNodeUri("tls://127.0.0.1://9090".to_string())
901 .scheme()
902 .is_err());
903 assert_eq!(
904 PravegaNodeUri("tls://127.0.0.1:9090".to_string())
905 .scheme()
906 .unwrap(),
907 "tls".to_string()
908 );
909 assert!(PravegaNodeUri("".to_string()).scheme().is_err());
910
911 assert!(
913 PravegaNodeUri("tcps://[1762:0:0:0:0:B03:1:AF18]:12345".to_string())
914 .scheme()
915 .is_ok()
916 );
917
918 assert_eq!(
919 PravegaNodeUri::uri_parts_from_string("tcps://[::1]:12345".to_string()).unwrap(),
920 PravegaNodeUriParts {
921 scheme: Some("tcps".to_string()),
922 domain_name: Some("[::1]".to_string()),
923 port: Some(12345),
924 }
925 );
926 }
927
928 #[test]
929 fn test_scoped_segment() {
930 let seg1 = ScopedSegment::from("test/123.#epoch.0");
931 assert_eq!(
932 seg1.stream,
933 Stream {
934 name: "test".to_string()
935 }
936 );
937 assert_eq!(
938 seg1.segment,
939 Segment {
940 number: 123,
941 tx_id: None
942 }
943 );
944 assert_eq!(seg1.to_string(), "/test/123.#epoch.0");
945
946 let seg2 = ScopedSegment::from("scope/test/123");
947 assert_eq!(
948 seg2.scope,
949 Scope {
950 name: "scope".to_string()
951 }
952 );
953 assert_eq!(
954 seg1.stream,
955 Stream {
956 name: "test".to_string()
957 }
958 );
959 assert_eq!(
960 seg1.segment,
961 Segment {
962 number: 123,
963 tx_id: None
964 }
965 );
966 assert_eq!(seg2.to_string(), "scope/test/123.#epoch.0");
967 }
968
969 #[test]
970 fn test_scoped_stream() {
971 let stream = ScopedStream {
972 scope: Scope {
973 name: "scope".to_string(),
974 },
975 stream: Stream {
976 name: "stream".to_string(),
977 },
978 };
979 let segment = ScopedSegment::from("scope/stream/123.#epoch.0");
980 let derived_stream = ScopedStream::from(&segment);
981 assert_eq!(stream, derived_stream);
982 }
983}