onebot_api/communication/
sse.rs1use super::utils::*;
2use async_trait::async_trait;
3use bytes::Bytes;
4use eventsource_stream::{EventStream, Eventsource};
5use futures::{Stream, StreamExt};
6use reqwest::IntoUrl;
7use std::sync::Arc;
8use tokio::select;
9use tokio::sync::broadcast;
10use url::Url;
11
12#[derive(Debug, Clone)]
13pub struct SseService {
14 url: Url,
15 access_token: Option<String>,
16 event_sender: Option<EventSender>,
17 close_signal_sender: broadcast::Sender<()>,
18}
19
20impl SseService {
21 pub fn new(url: impl IntoUrl, access_token: Option<String>) -> reqwest::Result<Self> {
22 let (close_signal_sender, _) = broadcast::channel(1);
23 Ok(Self {
24 url: url.into_url()?,
25 access_token,
26 event_sender: None,
27 close_signal_sender,
28 })
29 }
30
31 pub async fn eventsource(
32 &self,
33 ) -> anyhow::Result<EventStream<impl Stream<Item = reqwest::Result<Bytes>>>> {
34 let client = reqwest::Client::new();
35 let mut req = client.get(self.url.clone());
36 if let Some(token) = &self.access_token {
37 req = req.header("Authorization", "Bearer ".to_string() + token);
38 }
39 let eventsource = req.send().await?.bytes_stream().eventsource();
40 Ok(eventsource)
41 }
42
43 async fn eventsource_listener(self) -> anyhow::Result<()> {
44 let mut close_signal = self.close_signal_sender.subscribe();
45 let mut es = self.eventsource().await?;
46 let event_sender = self.event_sender.clone().unwrap();
47 loop {
48 select! {
49 _ = close_signal.recv() => return Err(anyhow::anyhow!("close")),
50 Some(Ok(es_event)) = es.next() => {
51 let event = serde_json::from_str(&es_event.data);
52 if event.is_err() {
53 continue
54 }
55 let _ = event_sender.send(Arc::new(event?));
56 }
57 }
58 }
59 }
60}
61
62#[async_trait]
63impl CommunicationService for SseService {
64 fn inject(&mut self, _api_receiver: APIReceiver, event_sender: EventSender) {
65 self.event_sender = Some(event_sender);
66 }
67
68 async fn start_service(&self) -> anyhow::Result<()> {
69 if self.event_sender.is_none() {
70 return Err(anyhow::anyhow!("event sender is none"));
71 }
72
73 tokio::spawn(Self::eventsource_listener(self.clone()));
74
75 Ok(())
76 }
77}