pravega_client_shared/
lib.rs

1/*
2 * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 */
10#![deny(
11    clippy::all,
12    clippy::cargo,
13    clippy::else_if_without_else,
14    clippy::empty_line_after_outer_attr,
15    clippy::multiple_inherent_impl,
16    clippy::mut_mut,
17    clippy::path_buf_push_overwrite
18)]
19#![warn(
20    clippy::cargo_common_metadata,
21    clippy::mutex_integer,
22    clippy::needless_borrow,
23    clippy::similar_names
24)]
25#![allow(clippy::multiple_crate_versions)]
26
27pub mod naming_utils;
28
29use crate::naming_utils::NameUtils;
30use derive_more::{Display, From};
31use encoding_rs::mem;
32use im::HashMap as ImHashMap;
33use im::OrdMap;
34use lazy_static::lazy_static;
35use murmurhash3::murmurhash3_x64_128;
36use ordered_float::OrderedFloat;
37use regex::Regex;
38use serde::{Deserialize, Serialize};
39use snafu::Snafu;
40use std::cmp::{min, Reverse};
41use std::collections::{BTreeMap, HashMap};
42use std::convert::From;
43use std::fmt;
44use std::fmt::{Debug, Write};
45use std::fmt::{Display, Formatter};
46use std::net::{SocketAddr, ToSocketAddrs};
47use std::ops::Index;
48use std::vec;
49use tokio_rustls::rustls;
50use tokio_rustls::rustls::{RootCertStore, ServerCertVerified, ServerCertVerifier, TLSError};
51use tokio_rustls::webpki::DNSNameRef;
52use uuid::Uuid;
53
54#[macro_use]
55extern crate shrinkwraprs;
56
57#[macro_use]
58extern crate derive_new;
59
60#[macro_use]
61extern crate num_derive;
62
63type PravegaNodeUriParseResult<T> = std::result::Result<T, PravegaNodeUriParseError>;
64
65#[derive(Debug, Snafu)]
66pub enum PravegaNodeUriParseError {
67    #[snafu(display("Could not parse uri due to {}", error_msg))]
68    ParseError { error_msg: String },
69}
70
71#[derive(Debug, PartialEq, Default)]
72struct PravegaNodeUriParts {
73    scheme: Option<String>,
74    domain_name: Option<String>,
75    port: Option<u16>,
76}
77
78#[derive(From, Shrinkwrap, Debug, Clone, Hash, PartialEq, Eq)]
79pub struct PravegaNodeUri(pub String);
80
81impl From<&str> for PravegaNodeUri {
82    fn from(endpoint: &str) -> Self {
83        PravegaNodeUri(endpoint.to_owned())
84    }
85}
86
87impl From<(&str, u16)> for PravegaNodeUri {
88    fn from(tuple: (&str, u16)) -> Self {
89        let endpoint = format!("{}:{}", tuple.0, tuple.1);
90        PravegaNodeUri(endpoint)
91    }
92}
93
94impl From<SocketAddr> for PravegaNodeUri {
95    fn from(socket_addr: SocketAddr) -> Self {
96        PravegaNodeUri(socket_addr.to_string())
97    }
98}
99
100impl PravegaNodeUri {
101    pub fn to_socket_addr(&self) -> SocketAddr {
102        // to_socket_addrs will resolve hostname to ip address
103        match PravegaNodeUri::uri_parts_from_string(self.to_string()) {
104            Ok(uri_parts) => {
105                let mut addrs_vec: Vec<_> =
106                    format!("{}:{}", uri_parts.domain_name.unwrap(), uri_parts.port.unwrap())
107                        .to_socket_addrs()
108                        .expect("Unable to resolve domain")
109                        .collect();
110                addrs_vec.pop().expect("get the first SocketAddr")
111            }
112            Err(e) => panic!("{}", e),
113        }
114    }
115
116    pub fn domain_name(&self) -> String {
117        match PravegaNodeUri::uri_parts_from_string(self.to_string()) {
118            Ok(uri_parts) => uri_parts.domain_name.expect("uri missing domain name"),
119            Err(e) => panic!("{}", e),
120        }
121    }
122
123    pub fn port(&self) -> u16 {
124        match PravegaNodeUri::uri_parts_from_string(self.to_string()) {
125            Ok(uri_parts) => uri_parts.port.expect("parse port to u16"),
126            Err(e) => panic!("{}", e),
127        }
128    }
129
130    /// Return Result of the uri scheme or empty string if no scheme was specified
131    pub fn scheme(&self) -> PravegaNodeUriParseResult<String> {
132        match PravegaNodeUri::uri_parts_from_string(self.to_string()) {
133            Ok(sa) => match sa.scheme {
134                Some(scheme) => Ok(scheme),
135                _ => Ok("".to_string()),
136            },
137            Err(e) => Err(e),
138        }
139    }
140
141    /// verifies the uri is well-formed (contains at least host:port, with optional scheme:// prefix)
142    ///
143    pub fn is_well_formed(uri: String) -> bool {
144        match PravegaNodeUri::uri_parts_from_string(uri) {
145            Ok(uri_parts) => uri_parts.port.is_some() && uri_parts.domain_name.is_some(),
146            Err(_) => false,
147        }
148    }
149
150    fn uri_parts_from_string(uri: String) -> PravegaNodeUriParseResult<PravegaNodeUriParts> {
151        lazy_static! {
152            static ref URI_RE: Regex = Regex::new(
153                r"(?x)
154            (?:(?P<scheme>[[:alnum:]]+)://)?
155            (?P<domain_name>([0-9A-Za-z\-\.]+|\[[0-9A-F\.:]+\]))
156            :
157            (?P<port>[[:digit:]]+)"
158            )
159            .unwrap();
160        }
161        let mut uri_parts: PravegaNodeUriParts = PravegaNodeUriParts::default();
162
163        // The Java client supports multiple comma separated endpoints in a single string
164        // where the first endpoint has the scheme to be applied to all endpoints.
165        // To be semi-compatible with the Java code this method accepts a string containing comma
166        // separated (or any separator) endpoints, but it uses only the first endpoint in the string.
167        // The domain name portion can be an ip name, ipv4 address or an ipv6 literal
168        // ipv6 addresses retain [] wrapper because both to_socket_addrs() and
169        // http URI from_str() require IP literals
170        let first_endpoint = match URI_RE.captures_iter(&uri).next() {
171            Some(endpoint) => endpoint,
172            _ => {
173                return Err(PravegaNodeUriParseError::ParseError {
174                    error_msg: format!("malformed uri {}", uri),
175                })
176            }
177        };
178
179        uri_parts.domain_name = Some(first_endpoint.name("domain_name").unwrap().as_str().to_string());
180        uri_parts.port = Some(
181            first_endpoint
182                .name("port")
183                .unwrap()
184                .as_str()
185                .parse::<u16>()
186                .expect("port not a valid u16"),
187        );
188        uri_parts.scheme = first_endpoint
189            .name("scheme")
190            .map(|scheme| scheme.as_str().to_string());
191        Ok(uri_parts)
192    }
193}
194
195#[derive(new, Debug, Clone, Hash, PartialEq, Eq)]
196pub struct DelegationToken {
197    value: String,
198    expiry_time: Option<u64>,
199}
200
201impl DelegationToken {
202    pub fn get_value(&self) -> String {
203        self.value.clone()
204    }
205
206    pub fn get_expiry_time(&self) -> Option<u64> {
207        self.expiry_time
208    }
209}
210
211#[derive(From, Shrinkwrap, Debug, Clone, Hash, PartialEq, Eq)]
212pub struct Timestamp(pub u64);
213
214#[derive(From, Shrinkwrap, Debug, Display, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
215pub struct Scope {
216    pub name: String,
217}
218
219#[derive(From, Shrinkwrap, Debug, Display, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
220pub struct Stream {
221    pub name: String,
222}
223
224#[derive(From, Shrinkwrap, Debug, Display, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
225pub struct Reader {
226    pub name: String,
227}
228
229#[derive(Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
230pub struct Segment {
231    pub number: i64,
232    pub tx_id: Option<TxId>,
233}
234
235impl Segment {
236    pub fn from(number: i64) -> Self {
237        Segment { number, tx_id: None }
238    }
239
240    pub fn from_id_and_epoch(segment_id: i32, epoch: i32) -> Self {
241        let epoch_i64 = (epoch as i64) << 32;
242        let id_i64 = segment_id as i64;
243        Segment {
244            number: epoch_i64 + id_i64,
245            tx_id: None,
246        }
247    }
248
249    pub fn from_txn(number: i64, tx_id: TxId) -> Self {
250        Segment {
251            number,
252            tx_id: Some(tx_id),
253        }
254    }
255
256    pub fn is_transaction_segment(&self) -> bool {
257        self.tx_id.is_some()
258    }
259
260    pub fn get_epoch(&self) -> i32 {
261        (self.number >> 32) as i32
262    }
263
264    pub fn get_segment_number(&self) -> i32 {
265        self.number as i32
266    }
267}
268
269impl fmt::Debug for Segment {
270    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
271        f.debug_struct("Segment")
272            .field("segment", &self.get_segment_number())
273            .field("epoch", &self.get_epoch())
274            .finish()
275    }
276}
277
278#[derive(new, Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
279pub struct ScopedStream {
280    pub scope: Scope,
281    pub stream: Stream,
282}
283
284impl From<&ScopedSegment> for ScopedStream {
285    fn from(scoped_segment: &ScopedSegment) -> Self {
286        ScopedStream {
287            scope: scoped_segment.scope.clone(),
288            stream: scoped_segment.stream.clone(),
289        }
290    }
291}
292
293impl From<&str> for ScopedStream {
294    fn from(string: &str) -> Self {
295        let buf = string.split('/').collect::<Vec<&str>>();
296        ScopedStream {
297            scope: Scope {
298                name: buf[0].to_string(),
299            },
300            stream: Stream {
301                name: buf[1].to_string(),
302            },
303        }
304    }
305}
306
307///
308/// This represents the continuation token returned by the controller
309/// as part of the list streams grpc API.
310///
311#[derive(new, Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
312pub struct CToken {
313    pub token: String,
314}
315
316impl CToken {
317    pub fn empty() -> CToken {
318        CToken {
319            token: String::from(""),
320        }
321    }
322}
323
324impl From<&str> for CToken {
325    fn from(string: &str) -> Self {
326        CToken {
327            token: string.to_string(),
328        }
329    }
330}
331
332#[derive(new, Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
333pub struct ScopedSegment {
334    pub scope: Scope,
335    pub stream: Stream,
336    pub segment: Segment,
337}
338
339impl ScopedSegment {
340    pub fn get_scoped_stream(&self) -> ScopedStream {
341        ScopedStream::new(self.scope.clone(), self.stream.clone())
342    }
343}
344
345impl From<&str> for ScopedSegment {
346    fn from(qualified_name: &str) -> Self {
347        if NameUtils::is_transaction_segment(qualified_name) {
348            let original_segment_name = NameUtils::get_parent_stream_segment_name(qualified_name);
349            ScopedSegment::from(original_segment_name)
350        } else {
351            let mut tokens = NameUtils::extract_segment_tokens(qualified_name.to_owned());
352            let segment_id = tokens.pop().expect("get segment id from tokens");
353            let stream_name = tokens.pop().expect("get stream name from tokens");
354
355            if tokens.is_empty() {
356                // scope not present
357                ScopedSegment {
358                    scope: Scope {
359                        name: String::from(""),
360                    },
361                    stream: Stream { name: stream_name },
362                    segment: Segment {
363                        number: segment_id.parse::<i64>().expect("parse string to i64"),
364                        tx_id: None,
365                    },
366                }
367            } else {
368                let scope = tokens.pop().expect("get scope from tokens");
369                ScopedSegment {
370                    scope: Scope { name: scope },
371                    stream: Stream { name: stream_name },
372                    segment: Segment {
373                        number: segment_id.parse::<i64>().expect("parse string to i64"),
374                        tx_id: None,
375                    },
376                }
377            }
378        }
379    }
380}
381
382#[derive(From, Shrinkwrap, Copy, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
383pub struct TxId(pub u128);
384
385impl TxId {
386    ///
387    /// Obtain epoch from a given Transaction Id.
388    ///
389    pub fn get_epoch(&self) -> i32 {
390        (self.0 >> 96) as i32
391    }
392}
393
394impl Display for TxId {
395    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
396        f.write_str(Uuid::from_u128(self.0).to_hyphenated().to_string().as_str())?;
397        Ok(())
398    }
399}
400
401impl fmt::Debug for TxId {
402    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
403        f.write_str(Uuid::from_u128(self.0).to_hyphenated().to_string().as_str())?;
404        Ok(())
405    }
406}
407
408impl Display for ScopedStream {
409    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
410        f.write_str(&self.scope.name)?;
411        f.write_char('/')?;
412        f.write_str(&self.stream.name)?;
413        Ok(())
414    }
415}
416
417impl Display for ScopedSegment {
418    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
419        f.write_str(&NameUtils::get_qualified_stream_segment_name(
420            &self.scope.name,
421            &self.stream.name,
422            self.segment.number,
423            self.segment.tx_id,
424        ))?;
425        Ok(())
426    }
427}
428
429#[derive(From, Shrinkwrap, Copy, Clone, Hash, PartialEq, Eq)]
430pub struct WriterId(pub u128);
431
432impl Debug for WriterId {
433    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
434        write!(f, "{:x}", self.0)
435    }
436}
437
438impl Display for WriterId {
439    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
440        write!(f, "{:x}", self.0)
441    }
442}
443
444#[derive(Debug, Clone, Hash, PartialEq, Eq, FromPrimitive)]
445pub enum ScaleType {
446    FixedNumSegments = 0,
447    ByRateInKbytesPerSec = 1,
448    ByRateInEventsPerSec = 2,
449}
450
451#[derive(Debug, Clone, Hash, PartialEq, Eq)]
452pub struct Scaling {
453    pub scale_type: ScaleType,
454    pub target_rate: i32,
455    pub scale_factor: i32,
456    pub min_num_segments: i32,
457}
458
459impl Default for Scaling {
460    fn default() -> Self {
461        Scaling {
462            scale_type: ScaleType::FixedNumSegments,
463            min_num_segments: 1,
464            scale_factor: 1,
465            target_rate: 0,
466        }
467    }
468}
469
470#[derive(Debug, Clone, Hash, PartialEq, Eq, FromPrimitive)]
471pub enum RetentionType {
472    None = 0,
473    Time = 1,
474    Size = 2,
475}
476
477#[derive(Debug, Clone, Hash, PartialEq, Eq)]
478pub enum PingStatus {
479    Ok = 0,
480    Committed = 1,
481    Aborted = 2,
482}
483
484#[derive(Debug, Clone, Hash, PartialEq, Eq)]
485pub enum TransactionStatus {
486    Open = 0,
487    Committing = 1,
488    Committed = 2,
489    Aborting = 3,
490    Aborted = 4,
491}
492
493#[derive(Debug, Clone, Hash, PartialEq, Eq)]
494pub struct Retention {
495    pub retention_type: RetentionType,
496    pub retention_param: i64,
497}
498
499impl Default for Retention {
500    fn default() -> Self {
501        Retention {
502            retention_type: RetentionType::None,
503            retention_param: 0,
504        }
505    }
506}
507
508#[derive(new, Debug, Clone, Hash, PartialEq, Eq)]
509pub struct StreamConfiguration {
510    pub scoped_stream: ScopedStream,
511    pub scaling: Scaling,
512    pub retention: Retention,
513    pub tags: Option<Vec<String>>,
514}
515
516#[derive(new, Debug, Clone)]
517pub struct StreamCut {
518    pub scoped_stream: ScopedStream,
519    pub segment_offset_map: HashMap<i64, i64>,
520}
521
522const PREFIX_LENGTH: usize = 2;
523
524#[derive(new, Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)]
525pub struct SegmentWithRange {
526    pub scoped_segment: ScopedSegment,
527    pub min_key: OrderedFloat<f64>,
528    pub max_key: OrderedFloat<f64>,
529}
530
531impl SegmentWithRange {
532    pub fn get_segment(&self) -> Segment {
533        self.scoped_segment.segment.clone()
534    }
535}
536
537impl fmt::Display for SegmentWithRange {
538    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
539        let segment_str = self.scoped_segment.to_string();
540        write!(
541            f,
542            "{:02}{}{}-{}",
543            segment_str.len(),
544            segment_str,
545            self.min_key,
546            self.max_key
547        )
548    }
549}
550
551impl From<&str> for SegmentWithRange {
552    fn from(name: &str) -> Self {
553        let segment_name_length: usize = name[..PREFIX_LENGTH].parse().expect("parse prefix length");
554
555        let segment_str = &*name[PREFIX_LENGTH..PREFIX_LENGTH + segment_name_length]
556            .parse::<String>()
557            .expect("parse segment name");
558
559        let scoped_segment: ScopedSegment = segment_str.into();
560
561        let rest_string = name[PREFIX_LENGTH + segment_name_length..]
562            .parse::<String>()
563            .expect("parse segment name");
564
565        let mut parts: Vec<&str> = rest_string.split('-').collect();
566        let max_key = parts
567            .pop()
568            .expect("get max key")
569            .parse::<OrderedFloat<f64>>()
570            .expect("parse OrderedFloat from str");
571        let min_key = parts
572            .pop()
573            .expect("get max key")
574            .parse::<OrderedFloat<f64>>()
575            .expect("parse OrderedFloat from str");
576
577        SegmentWithRange {
578            scoped_segment,
579            min_key,
580            max_key,
581        }
582    }
583}
584
585#[derive(Debug, Clone, Hash, PartialEq, Eq)]
586pub struct StreamSegments {
587    pub key_segment_map: OrdMap<OrderedFloat<f64>, SegmentWithRange>,
588}
589
590impl StreamSegments {
591    const SEED: u64 = 1741865571; // This is the hashcode of String "EventRouter" in Java client
592
593    pub fn new(map_key_segment: BTreeMap<OrderedFloat<f64>, SegmentWithRange>) -> StreamSegments {
594        StreamSegments::assert_valid(&map_key_segment);
595        StreamSegments {
596            key_segment_map: map_key_segment.into(),
597        }
598    }
599
600    fn assert_valid(map: &BTreeMap<OrderedFloat<f64>, SegmentWithRange>) {
601        if !map.is_empty() {
602            let (min_key, _min_seg) = map.iter().next().expect("Error reading min key");
603            let (max_key, _max_seg) = map.iter().next_back().expect("Error read max key");
604            assert!(
605                min_key.gt(&OrderedFloat(0.0)),
606                "Min key is expected to be greater than 0.0"
607            );
608            assert!(max_key.ge(&OrderedFloat(1.0)), "Last Key is missing");
609            assert!(
610                max_key.lt(&OrderedFloat(1.0001)),
611                "Segments should have values only up to 1.0"
612            );
613        }
614    }
615
616    /// Selects a segment using a routing key.
617    pub fn get_segment_for_routing_key(
618        &self,
619        routing_key: &Option<String>,
620        rand_f64: fn() -> f64,
621    ) -> &ScopedSegment {
622        if let Some(key) = routing_key {
623            self.get_segment_for_string(key)
624        } else {
625            self.get_segment(rand_f64())
626        }
627    }
628
629    pub fn get_segment(&self, key: f64) -> &ScopedSegment {
630        assert!(OrderedFloat(key).ge(&OrderedFloat(0.0)), "Key should be >= 0.0");
631        assert!(OrderedFloat(key).le(&OrderedFloat(1.0)), "Key should be <= 1.0");
632        let r = self
633            .key_segment_map
634            .get_next(&OrderedFloat(key))
635            .expect("No matching segment found for the given key");
636        &r.1.scoped_segment
637    }
638
639    pub fn get_segment_for_string(&self, str: &str) -> &ScopedSegment {
640        let mut buffer_u16 = vec![0; str.len()];
641
642        // convert uft-8 encoded Rust string to utf-16.
643        mem::convert_str_to_utf16(str, &mut buffer_u16);
644
645        // the utf-16 is stored as u16 array, convert it to u8 array
646        let (prefix, buffer_u8, suffix) = unsafe { buffer_u16.align_to::<u8>() };
647        assert!(prefix.is_empty());
648        assert!(suffix.is_empty());
649
650        let (upper, _lower) = murmurhash3_x64_128(buffer_u8, StreamSegments::SEED);
651
652        // takes the first 64 bit as Java client uses asLong method.
653        let key = u64_to_f64_fraction(upper);
654        self.get_segment(key)
655    }
656
657    pub fn get_segments(&self) -> Vec<ScopedSegment> {
658        self.key_segment_map
659            .values()
660            .map(|v| v.scoped_segment.to_owned())
661            .collect::<Vec<ScopedSegment>>()
662    }
663
664    pub fn apply_replacement_range(
665        &self,
666        segment_replace: &Segment,
667        replacement_ranges: &StreamSegmentsWithPredecessors,
668    ) -> Result<StreamSegments, String> {
669        let mut replaced_ranges = replacement_ranges
670            .replacement_segments
671            .get(segment_replace)
672            .unwrap_or_else(|| panic!("Empty set of replacements"))
673            .clone();
674
675        replaced_ranges.sort_by_key(|k| Reverse(k.max_key));
676        let replaced_ranges_ref = &replaced_ranges;
677        StreamSegments::verify_continuous(replaced_ranges_ref)
678            .expect("Replacement ranges are not continuous");
679
680        let mut result: BTreeMap<OrderedFloat<f64>, SegmentWithRange> = BTreeMap::new();
681        for (key, seg) in self.key_segment_map.iter().rev() {
682            if segment_replace.number == seg.scoped_segment.segment.number {
683                // segment should be replaced.
684                for new_segment in replaced_ranges_ref {
685                    let lower_bound = self.key_segment_map.range(..key).next_back();
686                    match lower_bound {
687                        None => {
688                            result.insert(min(new_segment.max_key, *key), new_segment.clone());
689                        }
690                        Some(lower_bound_value) => {
691                            if new_segment.max_key.ge(lower_bound_value.0) {
692                                result.insert(min(new_segment.max_key, *key), new_segment.clone());
693                            }
694                        }
695                    };
696                }
697            } else {
698                result.insert(*key, seg.clone());
699            }
700        }
701
702        Ok(StreamSegments::new(result))
703    }
704
705    fn verify_continuous(segment_replace_ranges: &[SegmentWithRange]) -> Result<(), String> {
706        let mut previous = segment_replace_ranges.index(0).max_key;
707        for x in segment_replace_ranges {
708            if x.max_key.0.ne(&previous.0) {
709                return Err("Replacement segments are not continuous".to_string());
710            }
711            previous = x.min_key;
712        }
713        Ok(())
714    }
715}
716
717#[derive(new, Debug, Clone, Hash, PartialEq, Eq)]
718pub struct TxnSegments {
719    pub stream_segments: StreamSegments,
720    pub tx_id: TxId,
721}
722
723#[derive(Debug, Clone, Hash, PartialEq, Eq)]
724pub struct StreamSegmentsWithPredecessors {
725    pub segment_with_predecessors: ImHashMap<SegmentWithRange, Vec<Segment>>,
726    pub replacement_segments: ImHashMap<Segment, Vec<SegmentWithRange>>, // inverse lookup
727}
728
729impl StreamSegmentsWithPredecessors {
730    pub fn new(
731        segment_with_predecessor: ImHashMap<SegmentWithRange, Vec<Segment>>,
732    ) -> StreamSegmentsWithPredecessors {
733        let mut replacement_map: HashMap<Segment, Vec<SegmentWithRange>> = HashMap::new();
734        for (segment, predecessor) in &segment_with_predecessor {
735            for predecessor_segment in predecessor {
736                let predecessor = predecessor_segment.clone();
737                let mut replacement_segments = replacement_map
738                    .get(&predecessor)
739                    .get_or_insert(&Vec::new())
740                    .clone();
741                replacement_segments.push((*segment).clone());
742                replacement_map.insert(predecessor, replacement_segments.to_vec());
743            }
744        }
745        StreamSegmentsWithPredecessors {
746            segment_with_predecessors: segment_with_predecessor,
747            replacement_segments: replacement_map.into(), // convert to immutable map.
748        }
749    }
750
751    // implicitly indicating that the current stream is sealed.
752    // See issue https://github.com/pravega/pravega/issues/1684
753    pub fn is_stream_sealed(&self) -> bool {
754        self.segment_with_predecessors.is_empty()
755    }
756}
757
758// convert u64 to 0.0 - 1.0 in f64
759pub(crate) fn u64_to_f64_fraction(hash: u64) -> f64 {
760    let shifted = (hash >> 12) & 0x000f_ffff_ffff_ffff_u64;
761    f64::from_bits(0x3ff0_0000_0000_0000_u64 + shifted) - 1.0
762}
763
764#[derive(new, Debug, Clone, Hash, PartialEq, Eq)]
765pub struct EventRead {
766    pub event: Vec<u8>,
767}
768
769/// A client for looking at and editing the metadata related to a specific segment.
770#[derive(new, Debug, Clone, Hash, PartialEq, Eq)]
771pub struct SegmentInfo {
772    /// Which segment these properties relate to.
773    pub segment: ScopedSegment,
774
775    /// The offset at which data is available. In the event the stream has never been truncated this
776    /// is 0. However, if all data below a certain offset has been truncated, that offset will be
777    /// provide here. (Offsets are left absolute even if data is truncated so that positions in the
778    /// segment can be referred to consistently)
779    pub starting_offset: i64,
780
781    /// The offset at which new data would be written if it were to be added. This is equal to the
782    /// total length of all data written to the segment.
783    pub write_offset: i64,
784
785    /// If the segment is sealed and can no longer be written to.
786    pub is_sealed: bool,
787
788    /// The last time the segment was written to in milliseconds.
789    pub last_modified_time: i64,
790}
791
792/// This struct is used to to skip cert verifications.
793pub struct NoVerifier;
794
795impl ServerCertVerifier for NoVerifier {
796    fn verify_server_cert(
797        &self,
798        _roots: &RootCertStore,
799        _presented_certs: &[rustls::Certificate],
800        _dns_name: DNSNameRef,
801        _ocsp_response: &[u8],
802    ) -> Result<ServerCertVerified, TLSError> {
803        Ok(ServerCertVerified::assertion())
804    }
805}
806
807#[cfg(test)]
808mod test {
809    use super::*;
810    use std::convert::From;
811    use std::net::{IpAddr, Ipv4Addr};
812
813    #[test]
814    fn test_hash() {
815        let s = "hello";
816        let mut buffer_u16 = vec![0; s.len()];
817
818        mem::convert_str_to_utf16(s, &mut buffer_u16);
819
820        let (prefix, buffer_u8, suffix) = unsafe { buffer_u16.align_to::<u8>() };
821        assert!(prefix.is_empty());
822        assert!(suffix.is_empty());
823
824        let (upper, _lower) = murmurhash3_x64_128(&buffer_u8, StreamSegments::SEED);
825        assert_eq!(u64_to_f64_fraction(upper), 0.658716230571337);
826    }
827
828    #[test]
829    fn test_segment_with_range() {
830        let segment = SegmentWithRange {
831            scoped_segment: ScopedSegment {
832                scope: Scope::from("scope".to_owned()),
833                stream: Stream::from("stream".to_owned()),
834                segment: Segment::from(0),
835            },
836            min_key: OrderedFloat::from(0.0),
837            max_key: OrderedFloat::from(1.0),
838        };
839
840        let segment_string = &*segment.to_string();
841
842        let segment_from_string: SegmentWithRange = segment_string.into();
843
844        assert_eq!(segment_from_string, segment);
845    }
846
847    #[test]
848    fn test_pravega_node_uri() {
849        let uri = PravegaNodeUri("127.0.0.1:9090".to_string());
850        assert_eq!(PravegaNodeUri::from("127.0.0.1:9090"), uri);
851        assert_eq!(PravegaNodeUri::from(("127.0.0.1", 9090)), uri);
852        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9090);
853        assert_eq!(PravegaNodeUri::from(socket_addr), uri);
854        assert_eq!(uri.domain_name(), "127.0.0.1".to_string());
855        assert_eq!(uri.port(), 9090);
856        assert_eq!(uri.to_socket_addr(), socket_addr);
857
858        let uri = PravegaNodeUri("localhost:9090".to_string());
859        assert_eq!(PravegaNodeUri::from("localhost:9090"), uri);
860        assert_eq!(PravegaNodeUri::from(("localhost", 9090)), uri);
861        assert_eq!(uri.domain_name(), "localhost".to_string());
862        assert_eq!(uri.port(), 9090);
863        assert_eq!(uri.to_socket_addr(), socket_addr);
864
865        assert_eq!(
866            PravegaNodeUri::uri_parts_from_string("127.0.0.1:9090".to_string()).unwrap(),
867            PravegaNodeUriParts {
868                scheme: None,
869                domain_name: Some("127.0.0.1".to_string()),
870                port: Some(9090)
871            }
872        );
873
874        let uri_with_scheme = PravegaNodeUri("tls://127.0.0.1:9090".to_string());
875        assert_eq!(
876            PravegaNodeUri::uri_parts_from_string(uri_with_scheme.to_string()).unwrap(),
877            PravegaNodeUriParts {
878                scheme: Some("tls".to_string()),
879                domain_name: Some("127.0.0.1".to_string()),
880                port: Some(9090),
881            }
882        );
883        assert!(PravegaNodeUri::is_well_formed(uri_with_scheme.to_string()));
884        assert_eq!(uri_with_scheme.port(), 9090);
885
886        // test a multi-endpoint uri that java client claims to support in client/src/main/java/io/pravega/client/ClientConfig.java
887        let uri_with_scheme =
888            PravegaNodeUri("ssl://127.0.0.1:9090,127.0.0.1:9091,127.0.0.1:9092".to_string());
889        assert_eq!(
890            PravegaNodeUri::uri_parts_from_string(uri_with_scheme.to_string()).unwrap(),
891            PravegaNodeUriParts {
892                scheme: Some("ssl".to_string()),
893                domain_name: Some("127.0.0.1".to_string()),
894                port: Some(9090),
895            }
896        );
897
898        assert!(PravegaNodeUri::uri_parts_from_string("tls://127.0.0.1://9090".into()).is_err());
899        assert!(!PravegaNodeUri::is_well_formed("tls://127.0.0.1://9090".into()));
900        assert!(PravegaNodeUri("tls://127.0.0.1://9090".to_string())
901            .scheme()
902            .is_err());
903        assert_eq!(
904            PravegaNodeUri("tls://127.0.0.1:9090".to_string())
905                .scheme()
906                .unwrap(),
907            "tls".to_string()
908        );
909        assert!(PravegaNodeUri("".to_string()).scheme().is_err());
910
911        // test ipv6 literal
912        assert!(
913            PravegaNodeUri("tcps://[1762:0:0:0:0:B03:1:AF18]:12345".to_string())
914                .scheme()
915                .is_ok()
916        );
917
918        assert_eq!(
919            PravegaNodeUri::uri_parts_from_string("tcps://[::1]:12345".to_string()).unwrap(),
920            PravegaNodeUriParts {
921                scheme: Some("tcps".to_string()),
922                domain_name: Some("[::1]".to_string()),
923                port: Some(12345),
924            }
925        );
926    }
927
928    #[test]
929    fn test_scoped_segment() {
930        let seg1 = ScopedSegment::from("test/123.#epoch.0");
931        assert_eq!(
932            seg1.stream,
933            Stream {
934                name: "test".to_string()
935            }
936        );
937        assert_eq!(
938            seg1.segment,
939            Segment {
940                number: 123,
941                tx_id: None
942            }
943        );
944        assert_eq!(seg1.to_string(), "/test/123.#epoch.0");
945
946        let seg2 = ScopedSegment::from("scope/test/123");
947        assert_eq!(
948            seg2.scope,
949            Scope {
950                name: "scope".to_string()
951            }
952        );
953        assert_eq!(
954            seg1.stream,
955            Stream {
956                name: "test".to_string()
957            }
958        );
959        assert_eq!(
960            seg1.segment,
961            Segment {
962                number: 123,
963                tx_id: None
964            }
965        );
966        assert_eq!(seg2.to_string(), "scope/test/123.#epoch.0");
967    }
968
969    #[test]
970    fn test_scoped_stream() {
971        let stream = ScopedStream {
972            scope: Scope {
973                name: "scope".to_string(),
974            },
975            stream: Stream {
976                name: "stream".to_string(),
977            },
978        };
979        let segment = ScopedSegment::from("scope/stream/123.#epoch.0");
980        let derived_stream = ScopedStream::from(&segment);
981        assert_eq!(stream, derived_stream);
982    }
983}