misskey_api/streaming/channel/
queue_stats.rs

1use serde::{Deserialize, Serialize};
2
3#[derive(Deserialize, Debug, Clone)]
4#[serde(rename_all = "camelCase")]
5pub struct QueueJobStats {
6    pub active_since_prev_tick: u64,
7    pub active: u64,
8    pub waiting: u64,
9    pub delayed: u64,
10}
11
12#[derive(Deserialize, Debug, Clone)]
13#[serde(rename_all = "camelCase")]
14pub struct QueueStats {
15    pub deliver: QueueJobStats,
16    pub inbox: QueueJobStats,
17}
18
19#[derive(Deserialize, Debug, Clone)]
20#[serde(rename_all = "camelCase", tag = "type", content = "body")]
21pub enum QueueStatsEvent {
22    Stats(QueueStats),
23    StatsLog(Vec<QueueStats>),
24}
25
26#[derive(Serialize, Debug, Clone)]
27#[serde(rename_all = "camelCase", tag = "type", content = "body")]
28pub enum Message {
29    RequestLog { id: String, length: u64 },
30}
31
32#[derive(Serialize, Default, Debug, Clone)]
33pub struct Request {}
34
35impl misskey_core::streaming::ConnectChannelRequest for Request {
36    type Incoming = QueueStatsEvent;
37    type Outgoing = Message;
38
39    const NAME: &'static str = "queueStats";
40}
41
42#[cfg(test)]
43mod tests {
44    use super::{Message, QueueStatsEvent, Request};
45    use crate::test::websocket::TestClient;
46
47    use futures::{future, SinkExt, StreamExt};
48
49    #[tokio::test]
50    async fn subscribe_unsubscribe() {
51        let client = TestClient::new().await;
52        let mut stream = client.channel(Request::default()).await.unwrap();
53        stream.disconnect().await.unwrap();
54    }
55
56    #[tokio::test]
57    async fn stream_stats() {
58        use std::time::Duration;
59
60        let client = TestClient::new().await;
61        let mut stream = client.channel(Request::default()).await.unwrap();
62
63        // margin of 100 ms
64        tokio::time::timeout(Duration::from_millis(10100), async {
65            loop {
66                match stream.next().await.unwrap().unwrap() {
67                    QueueStatsEvent::Stats(_) => break,
68                    _ => continue,
69                }
70            }
71        })
72        .await
73        .unwrap();
74    }
75
76    #[tokio::test]
77    async fn stream_stats_log() {
78        use ulid_crate::Ulid;
79
80        let client = TestClient::new().await;
81        let (mut sink, mut stream) = client.channel(Request::default()).await.unwrap().split();
82
83        future::join(
84            async {
85                sink.send(Message::RequestLog {
86                    id: Ulid::new().to_string(),
87                    length: 50,
88                })
89                .await
90                .unwrap();
91            },
92            async {
93                loop {
94                    match stream.next().await.unwrap().unwrap() {
95                        QueueStatsEvent::StatsLog(_) => break,
96                        _ => continue,
97                    }
98                }
99            },
100        )
101        .await;
102    }
103}