misskey_api/streaming/channel/
messaging.rs1use crate::model::{id::Id, messaging::MessagingMessage, user::User, user_group::UserGroup};
2
3use serde::{Deserialize, Serialize};
4
5#[derive(Deserialize, Debug, Clone)]
6#[serde(rename_all = "camelCase", tag = "type", content = "body")]
7pub enum MessagingStreamEvent {
8 Message(MessagingMessage),
9 Deleted(Id<MessagingMessage>),
10 Read(Vec<Id<MessagingMessage>>),
11}
12
13#[derive(Serialize, Debug, Clone)]
14#[serde(rename_all = "camelCase", tag = "type", content = "body")]
15pub enum Message {
16 Read { id: Id<MessagingMessage> },
17}
18
19#[derive(Serialize, Debug, Clone)]
20#[serde(rename_all = "camelCase")]
21pub enum Request {
22 Otherparty(Id<User>),
23 Group(Id<UserGroup>),
24}
25
26impl misskey_core::streaming::ConnectChannelRequest for Request {
27 type Incoming = MessagingStreamEvent;
28 type Outgoing = Message;
29
30 const NAME: &'static str = "messaging";
31}
32
33#[cfg(test)]
34mod tests {
35 use super::{Message, MessagingStreamEvent, Request};
36 use crate::test::{websocket::TestClient, ClientExt};
37
38 use futures::{future, SinkExt, StreamExt};
39
40 #[tokio::test]
41 async fn subscribe_unsubscribe_otherparty() {
42 let client = TestClient::new().await;
43 let admin = client.admin.me().await;
44 let mut stream = client
45 .user
46 .channel(Request::Otherparty(admin.id))
47 .await
48 .unwrap();
49 stream.disconnect().await.unwrap();
50 }
51
52 #[tokio::test]
53 async fn subscribe_unsubscribe_group() {
54 let client = TestClient::new().await;
55 let group = client
56 .test(crate::endpoint::users::groups::create::Request {
57 name: "test".to_string(),
58 })
59 .await;
60 let mut stream = client.user.channel(Request::Group(group.id)).await.unwrap();
61 stream.disconnect().await.unwrap();
62 }
63
64 #[tokio::test]
65 async fn stream_message() {
66 let client = TestClient::new().await;
67 let user = client.user.me().await;
68 let admin = client.admin.me().await;
69 let mut stream = client
70 .user
71 .channel(Request::Otherparty(admin.id))
72 .await
73 .unwrap();
74
75 future::join(
76 client
77 .admin
78 .test(crate::endpoint::messaging::messages::create::Request {
79 text: Some("hi".to_string()),
80 user_id: Some(user.id),
81 group_id: None,
82 file_id: None,
83 }),
84 async {
85 loop {
86 match stream.next().await.unwrap().unwrap() {
87 MessagingStreamEvent::Message(_) => break,
88 _ => continue,
89 }
90 }
91 },
92 )
93 .await;
94 }
95
96 #[tokio::test]
97 async fn stream_deleted() {
98 let client = TestClient::new().await;
99 let user = client.user.me().await;
100 let admin = client.admin.me().await;
101 let message = client
102 .admin
103 .test(crate::endpoint::messaging::messages::create::Request {
104 text: Some("hi".to_string()),
105 user_id: Some(user.id),
106 group_id: None,
107 file_id: None,
108 })
109 .await;
110 let mut stream = client
111 .user
112 .channel(Request::Otherparty(admin.id))
113 .await
114 .unwrap();
115
116 future::join(
117 client
118 .admin
119 .test(crate::endpoint::messaging::messages::delete::Request {
120 message_id: message.id,
121 }),
122 async {
123 loop {
124 match stream.next().await.unwrap().unwrap() {
125 MessagingStreamEvent::Deleted(_) => break,
126 _ => continue,
127 }
128 }
129 },
130 )
131 .await;
132 }
133
134 #[tokio::test]
135 async fn stream_read() {
136 let client = TestClient::new().await;
137 let admin = client.admin.me().await;
138 let user = client.user.me().await;
139 let message = client
140 .user
141 .test(crate::endpoint::messaging::messages::create::Request {
142 text: Some("hi".to_string()),
143 user_id: Some(admin.id.clone()),
144 group_id: None,
145 file_id: None,
146 })
147 .await;
148 let mut user_stream = client
149 .user
150 .channel(Request::Otherparty(admin.id))
151 .await
152 .unwrap();
153 let mut admin_stream = client
154 .admin
155 .channel(Request::Otherparty(user.id))
156 .await
157 .unwrap();
158
159 future::join(
160 async {
161 admin_stream
162 .send(Message::Read {
163 id: message.id.clone(),
164 })
165 .await
166 .unwrap();
167 },
168 async {
169 loop {
170 match user_stream.next().await.unwrap().unwrap() {
171 MessagingStreamEvent::Read(_) => break,
172 _ => continue,
173 }
174 }
175 },
176 )
177 .await;
178 }
179}