1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
#![deny(missing_docs)]
//! Holochain utilities for websocket serving and connecting.
//!
//!  To establish an outgoing connection, use [`connect`]
//! which will return a tuple
//! ([`WebsocketSender`], [`WebsocketReceiver`])
//!
//! To open a listening socket, use [`WebsocketListener::bind`]
//! which will give you a [`WebsocketListener`]
//! which is an async Stream whose items resolve to that same tuple (
//! [`WebsocketSender`],
//! [`WebsocketReceiver`]
//! ).
//!
//! If you want to be able to shutdown the stream use [`WebsocketListener::bind_with_handle`]
//! which will give you a tuple ([`ListenerHandle`], [`ListenerStream`]).
//! You can use [`ListenerHandle::close`] to close immediately or
//! [`ListenerHandle::close_on`] to close on a future completing.
//!
//! # Example
//!
//! ```
//! use holochain_serialized_bytes::prelude::*;
//! use holochain_websocket::*;
//!
//! use std::convert::TryInto;
//! use tokio_stream::StreamExt;
//! use url2::prelude::*;
//!
//! #[derive(serde::Serialize, serde::Deserialize, SerializedBytes, Debug)]
//! struct TestMessage(pub String);
//!
//! #[tokio::main]
//! async fn main() {
//!     // Create a new server listening for connections
//!     let mut server = WebsocketListener::bind(
//!         url2!("ws://127.0.0.1:0"),
//!         std::sync::Arc::new(WebsocketConfig::default()),
//!     )
//!     .await
//!     .unwrap();
//!
//!     // Get the address of the server
//!     let binding = server.local_addr().clone();
//!
//!     tokio::task::spawn(async move {
//!         // Handle new connections
//!         while let Some(Ok((_send, mut recv))) = server.next().await {
//!             tokio::task::spawn(async move {
//!                 // Receive a message and echo it back
//!                 if let Some((msg, resp)) = recv.next().await {
//!                     // Deserialize the message
//!                     let msg: TestMessage = msg.try_into().unwrap();
//!                     // If this message is a request then we can respond
//!                     if resp.is_request() {
//!                         let msg = TestMessage(format!("echo: {}", msg.0));
//!                         resp.respond(msg.try_into().unwrap()).await.unwrap();
//!                     }
//!                 }
//!             });
//!         }
//!     });
//!
//!     // Connect the client to the server
//!     let (mut send, _recv) = connect(binding, std::sync::Arc::new(WebsocketConfig::default()))
//!         .await
//!         .unwrap();
//!
//!     let msg = TestMessage("test".to_string());
//!     // Make a request and get the echoed response
//!     let rsp: TestMessage = send.request(msg).await.unwrap();
//!
//!     assert_eq!("echo: test", &rsp.0,);
//! }
//!
//! ```
//!

use std::io::Error;
use std::io::ErrorKind;
use std::sync::Arc;

use holochain_serialized_bytes::prelude::*;
use stream_cancel::Valve;
use tracing::instrument;
use url2::Url2;
use util::url_to_addr;
use websocket::Websocket;

mod websocket_config;
pub use websocket_config::*;

#[allow(missing_docs)]
mod error;
pub use error::*;

mod websocket_listener;
pub use websocket_listener::*;

mod websocket_sender;
pub use websocket_sender::*;

mod websocket_receiver;
pub use websocket_receiver::*;

mod websocket;

mod util;

#[instrument(skip(config))]
/// Create a new external websocket connection.
pub async fn connect(
    url: Url2,
    config: Arc<WebsocketConfig>,
) -> WebsocketResult<(WebsocketSender, WebsocketReceiver)> {
    let addr = url_to_addr(&url, config.scheme).await?;
    let socket = tokio::net::TcpStream::connect(addr).await?;
    // TODO: find equivalent of this in new tokio
    // socket.set_keepalive(Some(std::time::Duration::from_secs(
    //     config.tcp_keepalive_s as u64,
    // )))?;
    let (socket, _) = tokio_tungstenite::client_async_with_config(
        url.as_str(),
        socket,
        Some(config.to_tungstenite()),
    )
    .await
    .map_err(|e| Error::new(ErrorKind::Other, e))?;
    tracing::debug!("Client connected");

    // Noop valve because we don't have a listener to shutdown the
    // ends when creating a client
    let (exit, valve) = Valve::new();
    exit.disable();
    Websocket::create_ends(config, socket, valve)
}

#[derive(Debug, serde::Serialize, serde::Deserialize, SerializedBytes)]
#[serde(tag = "type")]
/// The messages actually sent over the wire by this library.
/// If you want to impliment your own server or client you
/// will need this type or be able to serialize / deserialize it.
pub enum WireMessage {
    /// A message without a response.
    Signal {
        #[serde(with = "serde_bytes")]
        /// Actual bytes of the message serialized as [message pack](https://msgpack.org/).
        data: Vec<u8>,
    },
    /// A request that requires a response.
    Request {
        /// The id of this request.
        /// Note ids are recycled once they are used.
        id: u64,
        #[serde(with = "serde_bytes")]
        /// Actual bytes of the message serialized as [message pack](https://msgpack.org/).
        data: Vec<u8>,
    },
    /// The response to a request.
    Response {
        /// The id of the request that this response is for.
        id: u64,
        #[serde(with = "serde_bytes")]
        /// Actual bytes of the message serialized as [message pack](https://msgpack.org/).
        data: Option<Vec<u8>>,
    },
}