misskey_api/streaming/channel/
messaging_index.rs

1use crate::model::{id::Id, messaging::MessagingMessage};
2use crate::streaming::channel::NoOutgoing;
3
4use serde::{Deserialize, Serialize};
5
6#[derive(Deserialize, Debug, Clone)]
7#[serde(rename_all = "camelCase", tag = "type", content = "body")]
8pub enum MessagingIndexStreamEvent {
9    Message(MessagingMessage),
10    Read(Vec<Id<MessagingMessage>>),
11}
12
13#[derive(Serialize, Default, Debug, Clone)]
14pub struct Request {}
15
16impl misskey_core::streaming::ConnectChannelRequest for Request {
17    type Incoming = MessagingIndexStreamEvent;
18    type Outgoing = NoOutgoing;
19
20    const NAME: &'static str = "messagingIndex";
21}
22
23#[cfg(test)]
24mod tests {
25    use super::{MessagingIndexStreamEvent, Request};
26    use crate::test::{websocket::TestClient, ClientExt};
27
28    use futures::{future, StreamExt};
29
30    #[tokio::test]
31    async fn subscribe_unsubscribe() {
32        let client = TestClient::new().await;
33        let mut stream = client.user.channel(Request::default()).await.unwrap();
34        stream.disconnect().await.unwrap();
35    }
36
37    #[tokio::test]
38    async fn stream_message() {
39        let client = TestClient::new().await;
40        let user = client.user.me().await;
41        let mut stream = client.user.channel(Request::default()).await.unwrap();
42
43        future::join(
44            client
45                .admin
46                .test(crate::endpoint::messaging::messages::create::Request {
47                    text: Some("hi".to_string()),
48                    user_id: Some(user.id),
49                    group_id: None,
50                    file_id: None,
51                }),
52            async {
53                loop {
54                    match stream.next().await.unwrap().unwrap() {
55                        MessagingIndexStreamEvent::Message(_) => break,
56                        _ => continue,
57                    }
58                }
59            },
60        )
61        .await;
62    }
63
64    #[tokio::test]
65    async fn stream_read() {
66        let client = TestClient::new().await;
67        let user = client.user.me().await;
68        let message = client
69            .admin
70            .test(crate::endpoint::messaging::messages::create::Request {
71                text: Some("hi".to_string()),
72                user_id: Some(user.id.clone()),
73                group_id: None,
74                file_id: None,
75            })
76            .await;
77        let mut stream = client.user.channel(Request::default()).await.unwrap();
78
79        future::join(
80            client
81                .user
82                .test(crate::endpoint::messaging::messages::read::Request {
83                    message_id: message.id,
84                }),
85            async {
86                loop {
87                    match stream.next().await.unwrap().unwrap() {
88                        MessagingIndexStreamEvent::Read(_) => break,
89                        _ => continue,
90                    }
91                }
92            },
93        )
94        .await;
95    }
96}