Skip to main content

onebot_api/communication/
sse.rs

1use super::utils::*;
2use async_trait::async_trait;
3use bytes::Bytes;
4use eventsource_stream::{EventStream, Eventsource};
5use futures::{Stream, StreamExt};
6use reqwest::IntoUrl;
7use std::sync::Arc;
8use tokio::select;
9use tokio::sync::broadcast;
10use url::Url;
11
12#[derive(Debug, Clone)]
13pub struct SseService {
14	url: Url,
15	access_token: Option<String>,
16	event_sender: Option<EventSender>,
17	close_signal_sender: broadcast::Sender<()>,
18}
19
20impl SseService {
21	pub fn new(url: impl IntoUrl, access_token: Option<String>) -> reqwest::Result<Self> {
22		let (close_signal_sender, _) = broadcast::channel(1);
23		Ok(Self {
24			url: url.into_url()?,
25			access_token,
26			event_sender: None,
27			close_signal_sender,
28		})
29	}
30
31	pub async fn eventsource(
32		&self,
33	) -> anyhow::Result<EventStream<impl Stream<Item = reqwest::Result<Bytes>>>> {
34		let client = reqwest::Client::new();
35		let mut req = client.get(self.url.clone());
36		if let Some(token) = &self.access_token {
37			req = req.header("Authorization", "Bearer ".to_string() + token);
38		}
39		let eventsource = req.send().await?.bytes_stream().eventsource();
40		Ok(eventsource)
41	}
42
43	async fn eventsource_listener(self) -> anyhow::Result<()> {
44		let mut close_signal = self.close_signal_sender.subscribe();
45		let mut es = self.eventsource().await?;
46		let event_sender = self.event_sender.clone().unwrap();
47		loop {
48			select! {
49				_ = close_signal.recv() => return Err(anyhow::anyhow!("close")),
50				Some(Ok(es_event)) = es.next() => {
51					let event = serde_json::from_str(&es_event.data);
52					if event.is_err() {
53						continue
54					}
55					let _ = event_sender.send(Arc::new(event?));
56				}
57			}
58		}
59	}
60}
61
62#[async_trait]
63impl CommunicationService for SseService {
64	fn inject(&mut self, _api_receiver: APIReceiver, event_sender: EventSender) {
65		self.event_sender = Some(event_sender);
66	}
67
68	async fn start_service(&self) -> anyhow::Result<()> {
69		if self.event_sender.is_none() {
70			return Err(anyhow::anyhow!("event sender is none"));
71		}
72
73		tokio::spawn(Self::eventsource_listener(self.clone()));
74
75		Ok(())
76	}
77}