misskey_api/streaming/channel/
queue_stats.rs1use serde::{Deserialize, Serialize};
2
3#[derive(Deserialize, Debug, Clone)]
4#[serde(rename_all = "camelCase")]
5pub struct QueueJobStats {
6 pub active_since_prev_tick: u64,
7 pub active: u64,
8 pub waiting: u64,
9 pub delayed: u64,
10}
11
12#[derive(Deserialize, Debug, Clone)]
13#[serde(rename_all = "camelCase")]
14pub struct QueueStats {
15 pub deliver: QueueJobStats,
16 pub inbox: QueueJobStats,
17}
18
19#[derive(Deserialize, Debug, Clone)]
20#[serde(rename_all = "camelCase", tag = "type", content = "body")]
21pub enum QueueStatsEvent {
22 Stats(QueueStats),
23 StatsLog(Vec<QueueStats>),
24}
25
26#[derive(Serialize, Debug, Clone)]
27#[serde(rename_all = "camelCase", tag = "type", content = "body")]
28pub enum Message {
29 RequestLog { id: String, length: u64 },
30}
31
32#[derive(Serialize, Default, Debug, Clone)]
33pub struct Request {}
34
35impl misskey_core::streaming::ConnectChannelRequest for Request {
36 type Incoming = QueueStatsEvent;
37 type Outgoing = Message;
38
39 const NAME: &'static str = "queueStats";
40}
41
42#[cfg(test)]
43mod tests {
44 use super::{Message, QueueStatsEvent, Request};
45 use crate::test::websocket::TestClient;
46
47 use futures::{future, SinkExt, StreamExt};
48
49 #[tokio::test]
50 async fn subscribe_unsubscribe() {
51 let client = TestClient::new().await;
52 let mut stream = client.channel(Request::default()).await.unwrap();
53 stream.disconnect().await.unwrap();
54 }
55
56 #[tokio::test]
57 async fn stream_stats() {
58 use std::time::Duration;
59
60 let client = TestClient::new().await;
61 let mut stream = client.channel(Request::default()).await.unwrap();
62
63 tokio::time::timeout(Duration::from_millis(10100), async {
65 loop {
66 match stream.next().await.unwrap().unwrap() {
67 QueueStatsEvent::Stats(_) => break,
68 _ => continue,
69 }
70 }
71 })
72 .await
73 .unwrap();
74 }
75
76 #[tokio::test]
77 async fn stream_stats_log() {
78 use ulid_crate::Ulid;
79
80 let client = TestClient::new().await;
81 let (mut sink, mut stream) = client.channel(Request::default()).await.unwrap().split();
82
83 future::join(
84 async {
85 sink.send(Message::RequestLog {
86 id: Ulid::new().to_string(),
87 length: 50,
88 })
89 .await
90 .unwrap();
91 },
92 async {
93 loop {
94 match stream.next().await.unwrap().unwrap() {
95 QueueStatsEvent::StatsLog(_) => break,
96 _ => continue,
97 }
98 }
99 },
100 )
101 .await;
102 }
103}