mcprotocol_rs/transport/http/
client.rs1use crate::{protocol::Message, Result};
2use async_trait::async_trait;
3use reqwest::{header, Client};
4use std::sync::Mutex;
5use tokio::sync::mpsc;
6
7pub struct HttpClientConfig {
9 pub base_url: String,
10 pub auth_token: Option<String>,
11}
12
13pub struct HttpClient {
15 config: HttpClientConfig,
16 client: Client,
17 message_endpoint: Mutex<Option<String>>,
18 receiver: Mutex<Option<mpsc::Receiver<Message>>>,
19}
20
21impl HttpClient {
22 pub fn new(config: HttpClientConfig) -> Result<Self> {
24 let mut headers = header::HeaderMap::new();
25 if let Some(token) = &config.auth_token {
26 headers.insert(
27 header::AUTHORIZATION,
28 header::HeaderValue::from_str(&format!("Bearer {}", token))
29 .map_err(|e| crate::Error::Transport(e.to_string()))?,
30 );
31 }
32
33 let client = Client::builder().default_headers(headers).build()?;
34
35 Ok(Self {
36 config,
37 client,
38 message_endpoint: Mutex::new(None),
39 receiver: Mutex::new(None),
40 })
41 }
42}
43
44#[async_trait]
45impl super::HttpTransport for HttpClient {
46 async fn initialize(&mut self) -> Result<()> {
47 let url = format!("{}/events", self.config.base_url);
48 let stream = self
49 .client
50 .get(&url)
51 .header(header::ACCEPT, "text/event-stream")
52 .send()
53 .await
54 .map_err(|e| crate::Error::Transport(e.to_string()))?;
55
56 Ok(())
58 }
59
60 async fn send(&self, message: Message) -> Result<()> {
61 let endpoint = self
62 .message_endpoint
63 .lock()
64 .unwrap()
65 .as_ref()
66 .ok_or_else(|| crate::Error::Protocol("Message endpoint not received".into()))?
67 .clone();
68
69 self.client
70 .post(&endpoint)
71 .json(&message)
72 .send()
73 .await
74 .map_err(|e| crate::Error::Transport(e.to_string()))?;
75
76 Ok(())
77 }
78
79 async fn receive(&self) -> Result<Message> {
80 let mut receiver = self
81 .receiver
82 .lock()
83 .unwrap()
84 .take()
85 .ok_or_else(|| crate::Error::Protocol("SSE connection not established".into()))?;
86
87 let message = receiver
88 .recv()
89 .await
90 .ok_or_else(|| crate::Error::Protocol("SSE connection closed".into()))?;
91
92 *self.receiver.lock().unwrap() = Some(receiver);
93 Ok(message)
94 }
95
96 async fn close(&mut self) -> Result<()> {
97 *self.message_endpoint.lock().unwrap() = None;
98 *self.receiver.lock().unwrap() = None;
99 Ok(())
100 }
101}
102
103pub type DefaultHttpClient = HttpClient;