misskey_api/streaming/channel/
messaging.rs

1use crate::model::{id::Id, messaging::MessagingMessage, user::User, user_group::UserGroup};
2
3use serde::{Deserialize, Serialize};
4
5#[derive(Deserialize, Debug, Clone)]
6#[serde(rename_all = "camelCase", tag = "type", content = "body")]
7pub enum MessagingStreamEvent {
8    Message(MessagingMessage),
9    Deleted(Id<MessagingMessage>),
10    Read(Vec<Id<MessagingMessage>>),
11}
12
13#[derive(Serialize, Debug, Clone)]
14#[serde(rename_all = "camelCase", tag = "type", content = "body")]
15pub enum Message {
16    Read { id: Id<MessagingMessage> },
17}
18
19#[derive(Serialize, Debug, Clone)]
20#[serde(rename_all = "camelCase")]
21pub enum Request {
22    Otherparty(Id<User>),
23    Group(Id<UserGroup>),
24}
25
26impl misskey_core::streaming::ConnectChannelRequest for Request {
27    type Incoming = MessagingStreamEvent;
28    type Outgoing = Message;
29
30    const NAME: &'static str = "messaging";
31}
32
33#[cfg(test)]
34mod tests {
35    use super::{Message, MessagingStreamEvent, Request};
36    use crate::test::{websocket::TestClient, ClientExt};
37
38    use futures::{future, SinkExt, StreamExt};
39
40    #[tokio::test]
41    async fn subscribe_unsubscribe_otherparty() {
42        let client = TestClient::new().await;
43        let admin = client.admin.me().await;
44        let mut stream = client
45            .user
46            .channel(Request::Otherparty(admin.id))
47            .await
48            .unwrap();
49        stream.disconnect().await.unwrap();
50    }
51
52    #[tokio::test]
53    async fn subscribe_unsubscribe_group() {
54        let client = TestClient::new().await;
55        let group = client
56            .test(crate::endpoint::users::groups::create::Request {
57                name: "test".to_string(),
58            })
59            .await;
60        let mut stream = client.user.channel(Request::Group(group.id)).await.unwrap();
61        stream.disconnect().await.unwrap();
62    }
63
64    #[tokio::test]
65    async fn stream_message() {
66        let client = TestClient::new().await;
67        let user = client.user.me().await;
68        let admin = client.admin.me().await;
69        let mut stream = client
70            .user
71            .channel(Request::Otherparty(admin.id))
72            .await
73            .unwrap();
74
75        future::join(
76            client
77                .admin
78                .test(crate::endpoint::messaging::messages::create::Request {
79                    text: Some("hi".to_string()),
80                    user_id: Some(user.id),
81                    group_id: None,
82                    file_id: None,
83                }),
84            async {
85                loop {
86                    match stream.next().await.unwrap().unwrap() {
87                        MessagingStreamEvent::Message(_) => break,
88                        _ => continue,
89                    }
90                }
91            },
92        )
93        .await;
94    }
95
96    #[tokio::test]
97    async fn stream_deleted() {
98        let client = TestClient::new().await;
99        let user = client.user.me().await;
100        let admin = client.admin.me().await;
101        let message = client
102            .admin
103            .test(crate::endpoint::messaging::messages::create::Request {
104                text: Some("hi".to_string()),
105                user_id: Some(user.id),
106                group_id: None,
107                file_id: None,
108            })
109            .await;
110        let mut stream = client
111            .user
112            .channel(Request::Otherparty(admin.id))
113            .await
114            .unwrap();
115
116        future::join(
117            client
118                .admin
119                .test(crate::endpoint::messaging::messages::delete::Request {
120                    message_id: message.id,
121                }),
122            async {
123                loop {
124                    match stream.next().await.unwrap().unwrap() {
125                        MessagingStreamEvent::Deleted(_) => break,
126                        _ => continue,
127                    }
128                }
129            },
130        )
131        .await;
132    }
133
134    #[tokio::test]
135    async fn stream_read() {
136        let client = TestClient::new().await;
137        let admin = client.admin.me().await;
138        let user = client.user.me().await;
139        let message = client
140            .user
141            .test(crate::endpoint::messaging::messages::create::Request {
142                text: Some("hi".to_string()),
143                user_id: Some(admin.id.clone()),
144                group_id: None,
145                file_id: None,
146            })
147            .await;
148        let mut user_stream = client
149            .user
150            .channel(Request::Otherparty(admin.id))
151            .await
152            .unwrap();
153        let mut admin_stream = client
154            .admin
155            .channel(Request::Otherparty(user.id))
156            .await
157            .unwrap();
158
159        future::join(
160            async {
161                admin_stream
162                    .send(Message::Read {
163                        id: message.id.clone(),
164                    })
165                    .await
166                    .unwrap();
167            },
168            async {
169                loop {
170                    match user_stream.next().await.unwrap().unwrap() {
171                        MessagingStreamEvent::Read(_) => break,
172                        _ => continue,
173                    }
174                }
175            },
176        )
177        .await;
178    }
179}