misskey_api/streaming/channel/
main.rs

1use crate::model::{
2    antenna::Antenna, drive::DriveFile, id::Id, messaging::MessagingMessage, note::Note,
3    notification::Notification, signin::Signin, user::User,
4};
5use crate::streaming::channel::NoOutgoing;
6
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9
10#[derive(Deserialize, Debug, Clone)]
11#[serde(rename_all = "camelCase", tag = "type", content = "body")]
12pub enum MainStreamEvent {
13    ClientSettingUpdated {
14        key: String,
15        value: Value,
16    },
17    ReceiveFollowRequest(User),
18    Notification(Notification),
19    MeUpdated(User),
20    MessagingMessage(MessagingMessage),
21    ReadAllNotifications,
22    ReadAllUnreadMentions,
23    ReadAllAntennas,
24    ReadAllUnreadSpecifiedNotes,
25    ReadAllMessagingMessages,
26    ReadAllAnnouncements,
27    #[cfg(feature = "12-47-0")]
28    #[cfg_attr(docsrs, doc(cfg(feature = "12-47-0")))]
29    ReadAllChannels,
30    MyTokenRegenerated,
31    ReversiNoInvites,
32    /// TODO: Implement
33    ReversiInvited {},
34    /// TODO: Implement
35    PageEvent {},
36    Signin(Signin),
37    Unfollow(User),
38    Follow(User),
39    Followed(User),
40    Reply(Note),
41    Mention(Note),
42    Renote(Note),
43    ReadAntenna(Antenna),
44    UnreadMention(Id<Note>),
45    #[cfg(feature = "12-47-0")]
46    #[cfg_attr(docsrs, doc(cfg(feature = "12-47-0")))]
47    UnreadChannel(Id<Note>),
48    UnreadSpecifiedNote(Id<Note>),
49    UnreadMessagingMessage(MessagingMessage),
50    UnreadNotification(Notification),
51    UnreadAntenna(Antenna),
52    DriveFileCreated(DriveFile),
53    #[cfg(feature = "12-48-0")]
54    #[cfg_attr(docsrs, doc(cfg(feature = "12-48-0")))]
55    UrlUploadFinished {
56        marker: Option<String>,
57        file: DriveFile,
58    },
59}
60
61#[derive(Serialize, Default, Debug, Clone)]
62pub struct Request {}
63
64impl misskey_core::streaming::ConnectChannelRequest for Request {
65    type Incoming = MainStreamEvent;
66    type Outgoing = NoOutgoing;
67
68    const NAME: &'static str = "main";
69}
70
71#[cfg(test)]
72mod tests {
73    use super::{MainStreamEvent, Request};
74    use crate::test::{websocket::TestClient, ClientExt};
75
76    use futures::{future, StreamExt};
77
78    #[tokio::test]
79    async fn subscribe_unsubscribe() {
80        let client = TestClient::new().await;
81        let mut stream = client.channel(Request::default()).await.unwrap();
82        stream.disconnect().await.unwrap();
83    }
84
85    #[tokio::test]
86    async fn reply() {
87        let test_client = TestClient::new().await;
88        // create fresh user (for new main stream) to avoid events
89        // to be captured by other test cases
90        let (_, client) = test_client.admin.create_streaming_user().await;
91
92        let mut stream = client.channel(Request::default()).await.unwrap();
93
94        future::join(
95            async {
96                let note = client.create_note(Some("awesome"), None, None).await;
97                test_client
98                    .user
99                    .create_note(Some("nice"), None, Some(note.id))
100                    .await;
101            },
102            async {
103                loop {
104                    match stream.next().await.unwrap().unwrap() {
105                        MainStreamEvent::Reply(_) => break,
106                        _ => continue,
107                    }
108                }
109            },
110        )
111        .await;
112    }
113
114    #[tokio::test]
115    async fn mention() {
116        let test_client = TestClient::new().await;
117        // ditto
118        let (me, client) = test_client.admin.create_streaming_user().await;
119
120        let mut stream = client.channel(Request::default()).await.unwrap();
121
122        futures::future::join(
123            test_client
124                .user
125                .create_note(Some(&format!("@{} hello", me.username)), None, None),
126            async {
127                loop {
128                    match stream.next().await.unwrap().unwrap() {
129                        MainStreamEvent::Mention(_) => break,
130                        _ => continue,
131                    }
132                }
133            },
134        )
135        .await;
136    }
137
138    #[cfg(feature = "12-48-0")]
139    #[tokio::test]
140    async fn url_upload_finished() {
141        use crate::model::drive::DriveFile;
142
143        // ditto
144        let (_, client) = TestClient::new().await.admin.create_streaming_user().await;
145
146        let mut stream = client.channel(Request::default()).await.unwrap();
147
148        let expected_marker = ulid_crate::Ulid::new().to_string();
149        let expected_comment = ulid_crate::Ulid::new().to_string();
150
151        futures::future::join(
152            client.test(crate::endpoint::drive::files::upload_from_url::Request {
153                url: url::Url::parse("http://example.com/index.html").unwrap(),
154                folder_id: None,
155                is_sensitive: None,
156                force: None,
157                marker: Some(expected_marker.clone()),
158                comment: Some(expected_comment.clone()),
159            }),
160            async {
161                loop {
162                    match stream.next().await.unwrap().unwrap() {
163                        MainStreamEvent::UrlUploadFinished {
164                            marker: Some(marker),
165                            file:
166                                DriveFile {
167                                    comment: Some(comment),
168                                    ..
169                                },
170                        } if marker == expected_marker && comment == expected_comment => break,
171                        _ => continue,
172                    }
173                }
174            },
175        )
176        .await;
177    }
178
179    // TODO: Test the other events
180}