tears 0.8.0

A simple and elegant framework for building TUI applications using The Elm Architecture (TEA)
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
//! WebSocket subscription for real-time bidirectional communication.
//!
//! This module provides the [`WebSocket`] subscription source for establishing
//! WebSocket connections, receiving messages from WebSocket servers, and sending
//! messages to them.
//!
//! # Design Pattern: Stream-based Bidirectional Communication
//!
//! WebSocket uses the **stream-based bidirectional** pattern. The subscription
//! manages a long-lived connection and provides an `mpsc::UnboundedSender` for
//! immediate send operations. This design reflects the real-time, streaming
//! nature of WebSocket communication.
//!
//! For more details on why this pattern is used instead of `Command`-based
//! sending, see the "Design Philosophy" section in the [`subscription`](crate::subscription)
//! module documentation.
//!
//! # Feature Flag
//!
//! This module is only available when the `ws` feature is enabled:
//!
//! ```toml
//! [dependencies]
//! tears = { version = "0.8", features = ["ws"] }
//! ```
//!
//! ## TLS Support
//!
//! For secure WebSocket connections (wss://), you need to enable one of the TLS features:
//!
//! - `native-tls` - Uses the platform's native TLS implementation
//! - `rustls` - Uses rustls with ring crypto provider and native root certificates
//! - `rustls-tls-webpki-roots` - Uses rustls with ring crypto provider and webpki root certificates
//!
//! Example:
//!
//! ```toml
//! [dependencies]
//! tears = { version = "0.8", features = ["ws", "native-tls"] }
//! ```

use std::hash::{DefaultHasher, Hash, Hasher};

use futures::stream::{BoxStream, SplitSink};
use futures::{SinkExt as _, StreamExt as _, stream};
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::tungstenite::protocol::CloseFrame;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async};

use super::{SubscriptionId, SubscriptionSource};

/// Commands that can be sent to the WebSocket connection.
#[derive(Debug, Clone)]
pub enum WebSocketCommand {
    /// Send a text message
    SendText(String),
    /// Send a binary message
    SendBinary(Vec<u8>),
    /// Close the WebSocket connection with an optional close frame
    Close(Option<CloseFrame>),
}

/// Messages emitted by the WebSocket subscription.
#[derive(Debug, Clone)]
pub enum WebSocketMessage {
    /// Successfully connected to the WebSocket server, provides command sender
    Connected {
        sender: mpsc::UnboundedSender<WebSocketCommand>,
    },
    /// Disconnected from the WebSocket server (normal closure)
    Disconnected,
    /// A message received from the WebSocket server
    Received(Message),
    /// An error occurred (connection failure or communication error)
    Error { error: String },
}

/// A WebSocket subscription that connects to a WebSocket server and provides
/// bidirectional communication.
///
/// This subscription establishes a WebSocket connection to the specified URL and streams
/// incoming messages. It also provides a command sender through the [`WebSocketMessage::Connected`]
/// message, which allows the application to send messages back to the server.
///
/// ## Connection Behavior
///
/// - Connection is attempted asynchronously when the subscription starts
/// - If the connection fails, an error message is emitted
/// - Once connected, a `Connected` message is emitted with a command sender
/// - Both incoming and outgoing messages are handled in the same connection
///
/// ## Message Flow
///
/// 1. Subscription starts → Connection attempt begins
/// 2. On success → `WebSocketMessage::Connected` is emitted with a command sender
/// 3. Application stores the `sender` and can now send messages
/// 4. Incoming messages are emitted as `WebSocketMessage::Received`
/// 5. On normal disconnection → `WebSocketMessage::Disconnected` is emitted
/// 6. On connection failure or communication error → `WebSocketMessage::Error` is emitted
///
/// ## Disconnection Handling
///
/// The subscription distinguishes between normal disconnection and errors:
/// - `Disconnected`: Server closed connection gracefully, user requested close, or connection ended normally
/// - `Error`: Connection failed, network error, or protocol violation
///
/// ## Example
///
/// ```rust,no_run
/// use tears::subscription::{Subscription, websocket::{WebSocket, WebSocketMessage, WebSocketCommand}};
/// use tokio_tungstenite::tungstenite::Message;
/// use tokio::sync::mpsc;
///
/// enum AppMessage {
///     WebSocketConnected(mpsc::UnboundedSender<WebSocketCommand>),
///     WebSocketDisconnected,
///     WebSocketReceived(String),
///     WebSocketError(String),
/// }
///
/// struct App {
///     ws_sender: Option<mpsc::UnboundedSender<WebSocketCommand>>,
/// }
///
/// impl App {
///     fn update(&mut self, msg: AppMessage) {
///         match msg {
///             AppMessage::WebSocketConnected(sender) => {
///                 self.ws_sender = Some(sender);
///                 // Successfully connected, can now send messages
///             }
///             AppMessage::WebSocketDisconnected => {
///                 self.ws_sender = None;
///                 // Connection closed normally
///             }
///             AppMessage::WebSocketReceived(text) => {
///                 // Handle received message
///             }
///             AppMessage::WebSocketError(error) => {
///                 // Handle connection failure or communication error
///             }
///         }
///     }
///
///     fn send_message(&self, text: String) {
///         if let Some(sender) = &self.ws_sender {
///             let _ = sender.send(WebSocketCommand::SendText(text));
///         }
///     }
/// }
///
/// // Create a WebSocket subscription
/// let ws_sub = Subscription::new(WebSocket::new("wss://example.com/socket"))
///     .map(|msg| match msg {
///         WebSocketMessage::Connected { sender } => AppMessage::WebSocketConnected(sender),
///         WebSocketMessage::Disconnected => AppMessage::WebSocketDisconnected,
///         WebSocketMessage::Received(Message::Text(text)) => AppMessage::WebSocketReceived(text.to_string()),
///         WebSocketMessage::Error { error } => AppMessage::WebSocketError(error),
///         _ => AppMessage::WebSocketError("Unexpected message".to_string()),
///     });
/// ```
///
/// ## Performance Considerations
///
/// - Each unique URL creates a separate WebSocket connection
/// - Connections are maintained as long as the subscription is active
/// - The command sender can be cloned and shared across different parts of the application
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct WebSocket {
    url: String,
}

impl WebSocket {
    /// Creates a new WebSocket subscription for the specified URL.
    ///
    /// # Arguments
    ///
    /// * `url` - The WebSocket URL to connect to (e.g., `wss://example.com/socket`)
    ///
    /// # Example
    ///
    /// ```rust
    /// use tears::subscription::websocket::WebSocket;
    ///
    /// let ws = WebSocket::new("wss://echo.websocket.org");
    /// ```
    #[must_use]
    pub fn new(url: impl Into<String>) -> Self {
        Self { url: url.into() }
    }
}

impl WebSocket {
    /// Handle a single command and send the message through WebSocket
    async fn handle_command(
        cmd: WebSocketCommand,
        write: &mut SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
        msg_tx: &mpsc::UnboundedSender<WebSocketMessage>,
    ) {
        let result = match cmd {
            WebSocketCommand::SendText(text) => write.send(Message::Text(text.into())).await,
            WebSocketCommand::SendBinary(data) => write.send(Message::Binary(data.into())).await,
            WebSocketCommand::Close(frame) => write.send(Message::Close(frame)).await,
        };

        if let Err(e) = result {
            let _ = msg_tx.send(WebSocketMessage::Error {
                error: e.to_string(),
            });
        }
    }

    /// Main subscription loop that processes incoming messages and outgoing commands
    async fn run_subscription_loop(
        url: String,
        msg_tx: mpsc::UnboundedSender<WebSocketMessage>,
        mut cmd_rx: mpsc::UnboundedReceiver<WebSocketCommand>,
        cmd_tx: mpsc::UnboundedSender<WebSocketCommand>,
    ) {
        // Attempt to connect to WebSocket server
        let ws_stream = match connect_async(&url).await {
            Ok((stream, _)) => stream,
            Err(e) => {
                let _ = msg_tx.send(WebSocketMessage::Error {
                    error: format!("Connection failed: {e}"),
                });
                return;
            }
        };

        // Notify that connection was successful and provide command sender
        if msg_tx
            .send(WebSocketMessage::Connected { sender: cmd_tx })
            .is_err()
        {
            // Receiver dropped, exit early
            return;
        }

        let (mut write, mut read) = ws_stream.split();

        loop {
            tokio::select! {
                // Handle incoming messages from WebSocket server
                msg = read.next() => {
                    match msg {
                        Some(Ok(Message::Close(_))) => {
                            // Server sent close frame - normal disconnection
                            let _ = msg_tx.send(WebSocketMessage::Disconnected);
                            break;
                        }
                        Some(Ok(message)) => {
                            // Regular message (Text, Binary, Ping, Pong)
                            if msg_tx.send(WebSocketMessage::Received(message)).is_err() {
                                // Receiver dropped, exit loop
                                break;
                            }
                        }
                        Some(Err(e)) => {
                            // Communication error
                            let _ = msg_tx.send(WebSocketMessage::Error {
                                error: e.to_string(),
                            });
                            break;
                        }
                        None => {
                            // Connection closed unexpectedly
                            let _ = msg_tx.send(WebSocketMessage::Disconnected);
                            break;
                        }
                    }
                }
                // Handle outgoing commands
                cmd = cmd_rx.recv() => {
                    match cmd {
                        Some(WebSocketCommand::Close(frame)) => {
                            // User requested close - send close frame and notify
                            let _ = write.send(Message::Close(frame)).await;
                            let _ = msg_tx.send(WebSocketMessage::Disconnected);
                            break;
                        }
                        Some(cmd) => {
                            Self::handle_command(cmd, &mut write, &msg_tx).await;
                        }
                        None => {
                            // Command channel closed, treat as disconnection
                            let _ = msg_tx.send(WebSocketMessage::Disconnected);
                            break;
                        }
                    }
                }
            }
        }

        // Clean shutdown: close the write half
        let _ = write.close().await;
    }
}

impl SubscriptionSource for WebSocket {
    type Output = WebSocketMessage;

    fn stream(&self) -> BoxStream<'static, WebSocketMessage> {
        let (msg_tx, msg_rx) = mpsc::unbounded_channel();
        let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();

        let url = self.url.clone();

        tokio::spawn(async move {
            // Run the main subscription loop (will send Connected on success)
            Self::run_subscription_loop(url, msg_tx, cmd_rx, cmd_tx).await;
        });

        stream::unfold(msg_rx, |mut rx| async move {
            let msg = rx.recv().await?;
            Some((msg, rx))
        })
        .boxed()
    }

    fn id(&self) -> SubscriptionId {
        let mut hasher = DefaultHasher::new();
        self.hash(&mut hasher);
        SubscriptionId::of::<Self>(hasher.finish())
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use futures::StreamExt;

    #[test]
    fn test_ws_new() {
        let ws = WebSocket::new("wss://example.com");
        assert_eq!(ws.url, "wss://example.com");
    }

    #[test]
    fn test_ws_id_consistency() {
        let ws1 = WebSocket::new("wss://example.com");
        let ws2 = WebSocket::new("wss://example.com");

        // Same configuration should produce the same ID
        assert_eq!(ws1.id(), ws2.id());
    }

    #[test]
    fn test_ws_id_different_urls() {
        let ws1 = WebSocket::new("wss://example.com");
        let ws2 = WebSocket::new("wss://different.com");

        // Different urls should produce different IDs
        assert_ne!(ws1.id(), ws2.id());
    }

    #[tokio::test]
    async fn test_stream_emits_error_on_connection_failure() {
        // Use an invalid URL that will fail to connect
        let ws = WebSocket::new("ws://localhost:1");
        let mut stream = ws.stream();

        // First and only message should be Error due to connection failure
        assert!(matches!(
            stream.next().await,
            Some(WebSocketMessage::Error { .. }),
        ));
    }

    #[test]
    fn test_message_variants() {
        // Test that all message variants can be constructed and matched
        let (tx, _rx) = mpsc::unbounded_channel();

        // Test Connected variant with sender
        matches!(
            WebSocketMessage::Connected { sender: tx },
            WebSocketMessage::Connected { .. }
        );

        // Test Disconnected variant
        matches!(
            WebSocketMessage::Disconnected,
            WebSocketMessage::Disconnected
        );

        // Test Received variant
        matches!(
            WebSocketMessage::Received(Message::Text("test".into())),
            WebSocketMessage::Received(_)
        );

        // Test Error variant
        matches!(
            WebSocketMessage::Error {
                error: "test".to_string()
            },
            WebSocketMessage::Error { .. }
        );
    }
}