1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
use std::future::Future;
use std::pin::Pin;

use futures::StreamExt;
use futures::{pin_mut, select, stream::FuturesUnordered, FutureExt};

#[cfg(not(target_arch = "wasm32"))]
use super::Tcp;
#[cfg(unix)]
use super::Unix;
use crate::channel::handshake::Handshake;
use crate::Channel;
use crate::Result;

use super::WebSocket;

/// abstraction over any provider
pub enum AnyProvider {
    #[cfg(not(target_arch = "wasm32"))]
    /// encapsulates the tcp provider
    Tcp(Tcp),
    #[cfg(not(target_arch = "wasm32"))]
    /// encapsulates the tcp provider without any encryption
    InsecureTcp(Tcp),
    #[cfg(unix)]
    /// encapsulates the unix provider
    Unix(Unix),
    #[cfg(unix)]
    /// encapsulates the unix provider without any encryption
    InsecureUnix(Unix),
    /// encapsulates the websocket provider
    Wss(WebSocket),
    /// encapsulates the websocket provider without any encryption
    InsecureWss(WebSocket),
}

impl AnyProvider {
    #[inline]
    #[cfg(not(target_arch = "wasm32"))]
    /// get the next handshake
    ///
    /// CANCEL SAFETY: this method is cancel-safe, feel free to use it in select statements.
    /// ```norun
    /// while let Ok(chan) = provider.next().await {
    ///     let mut chan = chan.encrypted().await?;
    ///     chan.send("hello!").await?;
    /// }
    /// ```
    pub async fn next_handshake(&self) -> Result<Handshake> {
        match self {
            AnyProvider::Tcp(provider) => provider.next().await,
            AnyProvider::InsecureTcp(provider) => provider.next().await,
            #[cfg(unix)]
            AnyProvider::Unix(provider) => provider.next().await,
            #[cfg(unix)]
            AnyProvider::InsecureUnix(provider) => provider.next().await,
            AnyProvider::Wss(provider) => provider.next().await,
            AnyProvider::InsecureWss(provider) => provider.next().await,
        }
    }

    #[inline]
    #[cfg(not(target_arch = "wasm32"))]
    /// get the encryption of the provider
    pub fn encrypted(&self) -> bool {
        match self {
            AnyProvider::Tcp(_) => true,
            AnyProvider::InsecureTcp(_) => false,
            #[cfg(unix)]
            AnyProvider::Unix(_) => true,
            #[cfg(unix)]
            AnyProvider::InsecureUnix(_) => false,
            AnyProvider::Wss(_) => true,
            AnyProvider::InsecureWss(_) => false,
        }
    }

    #[inline]
    #[cfg(not(target_arch = "wasm32"))]
    /// get the next channel
    /// ! NOTE: You should only use this method as the example shows, since
    /// it uses internal future tooling to avoid using another runtime.
    /// ```norun
    /// while let Ok(chan) = provider.next().await {
    ///     let mut chan = chan.encrypted().await?;
    ///     chan.send("hello!").await?;
    /// }
    /// ```
    pub fn channels(self) -> ChannelIter {
        ChannelIter {
            listener: self,
            futures: FuturesUnordered::new(),
        }
    }
}

/// iterator over channels. NOTE: not completely zero-cost
pub struct ChannelIter {
    listener: AnyProvider,
    futures: FuturesUnordered<Pin<Box<dyn Future<Output = Result<Channel>> + Send + 'static>>>, // not Sync or UnwindSafe
}

impl ChannelIter {
    /// get the next channel from the provider
    pub async fn next(&mut self) -> Result<Channel> {
        let hs = self.listener.next_handshake().fuse();
        pin_mut!(hs);

        loop {
            let chan = select! {
                chan = self.futures.next().fuse() => {
                    match chan {
                        Some(chan) => chan,
                        None => continue,
                    }
                },
                res = hs => {
                    let hs: Handshake = res?;
                    if self.listener.encrypted() {
                        let fut = hs.encrypted();
                        self.futures.push(Box::pin(fut));
                        continue;
                    } else {
                        Ok(hs.raw())
                    }
                },
            };
            break chan;
        }
    }
}