1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/// This module defines the `SinkAdapter` and `StreamAdapter` traits.
use crateJointMessage;
use crateResponse;
use async_trait;
/// `SinkAdapter` is a trait that defines the interface for sending messages.
///
/// It is implemented by different types of sinks, such as WebSocket or MPSC.
/// It can be used as sink adapter in implementing custom Joint struct.
///
/// # example
///
/// ```rust
/// use injoint::connection::SinkAdapter;
/// use tungstenite::Message;
/// use async_trait::async_trait;
/// use injoint::joint::mpsc;
/// use injoint::response::Response;
///
/// #[derive(Clone)]
/// struct WSSink {
/// tx: tokio::sync::mpsc::Sender<Result<Message, tungstenite::Error>>,
/// }
///
/// #[async_trait]
/// impl SinkAdapter for WSSink {
/// async fn send(
/// &mut self,
/// response: Response,
/// ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
/// let message_text = serde_json::to_string(&response)?;
/// self.tx
/// .send(Ok(Message::Text(message_text.into())))
/// .await
/// .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
/// Ok(())
/// }
/// }
/// ```
///
/// `StreamAdapter` is a trait that defines the interface for receiving messages.
///
/// It is implemented by different types of streams, such as WebSocket or MPSC.
/// It can be used as stream adapter in implementing custom Joint struct.
///
/// # example
///
/// ```rust
/// use injoint::connection::StreamAdapter;
/// use injoint::message::JointMessage;
/// use async_trait::async_trait;
/// use futures_util::stream::SplitStream;
/// use futures_util::{io, StreamExt};
/// use tokio::net::TcpStream;
/// use tokio_tungstenite::WebSocketStream;
/// use tungstenite::Message;
/// use injoint::joint::mpsc;
/// use injoint::response::Response;
///
/// struct WSStream {
/// stream: SplitStream<WebSocketStream<TcpStream>>,
/// }
///
/// #[async_trait]
/// impl StreamAdapter for WSStream {
/// async fn next(&mut self) -> Result<JointMessage, Box<dyn std::error::Error + Send + Sync>> {
/// let message = self.stream.next().await.unwrap()?;
/// let message = match message {
/// Message::Text(text) => text,
/// _ => {
/// return Err(Box::new(io::Error::new(
/// io::ErrorKind::InvalidData,
/// "Invalid data",
/// )))
/// }
/// };
/// let message = serde_json::from_slice((&message).as_ref())?;
/// Ok(message)
/// }
/// }
/// ```
///