misskey_api/streaming/channel/
main.rs1use crate::model::{
2 antenna::Antenna, drive::DriveFile, id::Id, messaging::MessagingMessage, note::Note,
3 notification::Notification, signin::Signin, user::User,
4};
5use crate::streaming::channel::NoOutgoing;
6
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9
10#[derive(Deserialize, Debug, Clone)]
11#[serde(rename_all = "camelCase", tag = "type", content = "body")]
12pub enum MainStreamEvent {
13 ClientSettingUpdated {
14 key: String,
15 value: Value,
16 },
17 ReceiveFollowRequest(User),
18 Notification(Notification),
19 MeUpdated(User),
20 MessagingMessage(MessagingMessage),
21 ReadAllNotifications,
22 ReadAllUnreadMentions,
23 ReadAllAntennas,
24 ReadAllUnreadSpecifiedNotes,
25 ReadAllMessagingMessages,
26 ReadAllAnnouncements,
27 #[cfg(feature = "12-47-0")]
28 #[cfg_attr(docsrs, doc(cfg(feature = "12-47-0")))]
29 ReadAllChannels,
30 MyTokenRegenerated,
31 ReversiNoInvites,
32 ReversiInvited {},
34 PageEvent {},
36 Signin(Signin),
37 Unfollow(User),
38 Follow(User),
39 Followed(User),
40 Reply(Note),
41 Mention(Note),
42 Renote(Note),
43 ReadAntenna(Antenna),
44 UnreadMention(Id<Note>),
45 #[cfg(feature = "12-47-0")]
46 #[cfg_attr(docsrs, doc(cfg(feature = "12-47-0")))]
47 UnreadChannel(Id<Note>),
48 UnreadSpecifiedNote(Id<Note>),
49 UnreadMessagingMessage(MessagingMessage),
50 UnreadNotification(Notification),
51 UnreadAntenna(Antenna),
52 DriveFileCreated(DriveFile),
53 #[cfg(feature = "12-48-0")]
54 #[cfg_attr(docsrs, doc(cfg(feature = "12-48-0")))]
55 UrlUploadFinished {
56 marker: Option<String>,
57 file: DriveFile,
58 },
59}
60
61#[derive(Serialize, Default, Debug, Clone)]
62pub struct Request {}
63
64impl misskey_core::streaming::ConnectChannelRequest for Request {
65 type Incoming = MainStreamEvent;
66 type Outgoing = NoOutgoing;
67
68 const NAME: &'static str = "main";
69}
70
71#[cfg(test)]
72mod tests {
73 use super::{MainStreamEvent, Request};
74 use crate::test::{websocket::TestClient, ClientExt};
75
76 use futures::{future, StreamExt};
77
78 #[tokio::test]
79 async fn subscribe_unsubscribe() {
80 let client = TestClient::new().await;
81 let mut stream = client.channel(Request::default()).await.unwrap();
82 stream.disconnect().await.unwrap();
83 }
84
85 #[tokio::test]
86 async fn reply() {
87 let test_client = TestClient::new().await;
88 let (_, client) = test_client.admin.create_streaming_user().await;
91
92 let mut stream = client.channel(Request::default()).await.unwrap();
93
94 future::join(
95 async {
96 let note = client.create_note(Some("awesome"), None, None).await;
97 test_client
98 .user
99 .create_note(Some("nice"), None, Some(note.id))
100 .await;
101 },
102 async {
103 loop {
104 match stream.next().await.unwrap().unwrap() {
105 MainStreamEvent::Reply(_) => break,
106 _ => continue,
107 }
108 }
109 },
110 )
111 .await;
112 }
113
114 #[tokio::test]
115 async fn mention() {
116 let test_client = TestClient::new().await;
117 let (me, client) = test_client.admin.create_streaming_user().await;
119
120 let mut stream = client.channel(Request::default()).await.unwrap();
121
122 futures::future::join(
123 test_client
124 .user
125 .create_note(Some(&format!("@{} hello", me.username)), None, None),
126 async {
127 loop {
128 match stream.next().await.unwrap().unwrap() {
129 MainStreamEvent::Mention(_) => break,
130 _ => continue,
131 }
132 }
133 },
134 )
135 .await;
136 }
137
138 #[cfg(feature = "12-48-0")]
139 #[tokio::test]
140 async fn url_upload_finished() {
141 use crate::model::drive::DriveFile;
142
143 let (_, client) = TestClient::new().await.admin.create_streaming_user().await;
145
146 let mut stream = client.channel(Request::default()).await.unwrap();
147
148 let expected_marker = ulid_crate::Ulid::new().to_string();
149 let expected_comment = ulid_crate::Ulid::new().to_string();
150
151 futures::future::join(
152 client.test(crate::endpoint::drive::files::upload_from_url::Request {
153 url: url::Url::parse("http://example.com/index.html").unwrap(),
154 folder_id: None,
155 is_sensitive: None,
156 force: None,
157 marker: Some(expected_marker.clone()),
158 comment: Some(expected_comment.clone()),
159 }),
160 async {
161 loop {
162 match stream.next().await.unwrap().unwrap() {
163 MainStreamEvent::UrlUploadFinished {
164 marker: Some(marker),
165 file:
166 DriveFile {
167 comment: Some(comment),
168 ..
169 },
170 } if marker == expected_marker && comment == expected_comment => break,
171 _ => continue,
172 }
173 }
174 },
175 )
176 .await;
177 }
178
179 }