onebot_api/communication/
sse.rs1use super::utils::*;
2use crate::error::{ServiceStartError, ServiceStartResult};
3use async_trait::async_trait;
4use bytes::Bytes;
5use eventsource_stream::{EventStream, Eventsource};
6use futures::{Stream, StreamExt};
7use reqwest::IntoUrl;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, Ordering};
10use tokio::select;
11use tokio::sync::broadcast;
12use url::Url;
13
14#[derive(Debug, Clone)]
15pub struct SseService {
16 url: Url,
17 access_token: Option<String>,
18 event_sender: Option<InternalEventSender>,
19 close_signal_sender: broadcast::Sender<()>,
20 is_running: Arc<AtomicBool>,
21 }
25
26impl Drop for SseService {
27 fn drop(&mut self) {
28 self.uninstall();
29 }
30}
31
32impl SseService {
33 pub fn new(
34 url: impl IntoUrl,
35 access_token: Option<String>,
36 ) -> reqwest::Result<Self> {
39 let (close_signal_sender, _) = broadcast::channel(1);
40 Ok(Self {
42 url: url.into_url()?,
43 access_token,
44 event_sender: None,
45 close_signal_sender,
46 is_running: Arc::new(AtomicBool::new(false)),
47 })
51 }
52
53 pub async fn eventsource(
54 &self,
55 ) -> anyhow::Result<EventStream<impl Stream<Item = reqwest::Result<Bytes>>>> {
56 let client = reqwest::Client::new();
57 let mut req = client.get(self.url.clone());
58 if let Some(token) = &self.access_token {
59 req = req.header("Authorization", "Bearer ".to_string() + token);
60 }
61 let eventsource = req.send().await?.bytes_stream().eventsource();
62 Ok(eventsource)
63 }
64
65 async fn eventsource_listener(self) -> anyhow::Result<()> {
66 let mut close_signal = self.close_signal_sender.subscribe();
67 let mut es = self.eventsource().await?;
68 let event_sender = self.event_sender.clone().unwrap();
69 loop {
70 select! {
71 _ = close_signal.recv() => return Err(anyhow::anyhow!("close")),
72 event_option = es.next() => {
73 if let Some(Ok(es_event)) = event_option {
74 let event = serde_json::from_str(&es_event.data);
75 if event.is_err() {
76 continue
77 }
78 let _ = event_sender.send_async(event?).await;
79 }
80 }
81 }
82 }
83 }
84
85 }
99
100#[async_trait]
101impl CommunicationService for SseService {
102 fn install(&mut self, _api_receiver: InternalAPIReceiver, event_sender: InternalEventSender) {
103 self.event_sender = Some(event_sender);
104 }
105
106 fn uninstall(&mut self) {
107 self.stop();
108 self.event_sender = None;
109 }
110
111 fn stop(&self) {
112 let _ = self.close_signal_sender.send(());
113 self.is_running.store(false, Ordering::Relaxed);
114 }
115
116 async fn start(&self) -> ServiceStartResult<()> {
117 if self.is_running.load(Ordering::Relaxed) {
118 return Err(ServiceStartError::TaskIsRunning);
119 }
120
121 if self.event_sender.is_none() {
122 return Err(ServiceStartError::NotInjectedEventSender);
123 }
124
125 self.is_running.store(true, Ordering::Relaxed);
126 tokio::spawn(Self::eventsource_listener(self.clone()));
127
128 Ok(())
129 }
130}