1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#![deny(missing_docs)]
//! # Gsb Router
//!
//! ```no_run
//! use ya_sb_router::{InstanceConfig, RouterConfig};
//!
//! #[actix_rt::main]
//! async fn main() {
//!     let mut config = RouterConfig::from_env();
//!     config.gc_interval_secs(60);
//!     InstanceConfig::new(config).run_url(None).await;
//! }
//!
//! ```
use std::io;
use std::net::SocketAddr;

use futures::prelude::*;
use tokio::net::{TcpStream, ToSocketAddrs};

pub use config::RouterConfig;
pub use router::InstanceConfig;
#[cfg(unix)]
pub use unix::connect;
use ya_sb_proto::codec::{GsbMessage, GsbMessageCodec, ProtocolError};
use ya_sb_proto::*;

mod config;
mod connection;
mod router;

/// Starts in background new server instance on given tcp address.
pub async fn bind_tcp_router(addr: SocketAddr) -> Result<(), std::io::Error> {
    actix_rt::spawn(
        InstanceConfig::new(RouterConfig::from_env())
            .bind_tcp(addr)
            .await?,
    );
    Ok(())
}

#[cfg(unix)]
mod unix {
    use std::path::Path;

    use tokio::net::UnixStream;

    use super::*;

    #[doc(hidden)]
    pub async fn connect(
        gsb_addr: GsbAddr,
    ) -> (
        Box<dyn Sink<GsbMessage, Error = ProtocolError> + Unpin>,
        Box<dyn Stream<Item = Result<GsbMessage, ProtocolError>> + Unpin>,
    ) {
        match gsb_addr {
            GsbAddr::Tcp(addr) => {
                let (sink, stream) = tcp_connect(addr).await;
                (Box::new(sink), Box::new(stream))
            }
            GsbAddr::Unix(path) => {
                let (sink, stream) = unix_connect(path).await;
                (Box::new(sink), Box::new(stream))
            }
        }
    }

    pub async fn unix_connect<P: AsRef<Path>>(
        path: P,
    ) -> (
        impl Sink<GsbMessage, Error = ProtocolError>,
        impl Stream<Item = Result<GsbMessage, ProtocolError>>,
    ) {
        let sock = UnixStream::connect(path).await.expect("Connect failed");
        let framed = tokio_util::codec::Framed::new(sock, GsbMessageCodec::default());
        framed.split()
    }
}

#[cfg(not(unix))]
#[doc(hidden)]
pub async fn connect(
    gsb_addr: GsbAddr,
) -> (
    Box<dyn Sink<GsbMessage, Error = ProtocolError> + Unpin>,
    Box<dyn Stream<Item = Result<GsbMessage, ProtocolError>> + Unpin>,
) {
    match gsb_addr {
        GsbAddr::Tcp(addr) => {
            let (sink, stream) = tcp_connect(addr).await;
            (Box::new(sink), Box::new(stream))
        }
        GsbAddr::Unix(_) => panic!("Unix sockets not supported on this OS"),
    }
}

/// Starts in background new server instance on given gsb address.
pub async fn bind_gsb_router(gsb_url: Option<url::Url>) -> io::Result<()> {
    let _join_handle = actix_rt::spawn(
        InstanceConfig::new(RouterConfig::from_env())
            .bind_url(gsb_url)
            .await?,
    );
    Ok(())
}

#[doc(hidden)]
pub async fn tcp_connect(
    addr: impl ToSocketAddrs,
) -> (
    impl Sink<GsbMessage, Error = ProtocolError>,
    impl Stream<Item = Result<GsbMessage, ProtocolError>>,
) {
    let sock = TcpStream::connect(&addr).await.expect("Connect failed");
    let framed = tokio_util::codec::Framed::new(sock, GsbMessageCodec::default());
    framed.split()
}

#[cfg(feature = "tls")]
mod tls;

#[cfg(feature = "tls")]
pub use tls::*;