ray/
client.rs

1use crate::error::RayError;
2use crate::rate_limiter::{RateLimiterHandle, RateLimiterState};
3use crate::request::{Meta, Request};
4use crate::RayConfig;
5use std::path::{Path, PathBuf};
6use std::sync::{Arc, Mutex, OnceLock};
7use std::time::Duration;
8
9/// HTTP transport abstraction used by `Client`.
10pub trait Transport: Send + Sync {
11    /// Send a JSON payload to the Ray server.
12    fn post_json(&self, url: &str, body: Vec<u8>) -> Result<(), RayError>;
13}
14
15#[cfg(feature = "transport-ureq")]
16#[derive(Debug, Clone)]
17struct UreqTransport {
18    agent: ureq::Agent,
19    timeout: Duration,
20}
21
22#[cfg(feature = "transport-ureq")]
23impl UreqTransport {
24    fn new(timeout: Duration) -> Self {
25        let agent = ureq::AgentBuilder::new().build();
26        Self { agent, timeout }
27    }
28}
29
30#[cfg(feature = "transport-ureq")]
31impl Transport for UreqTransport {
32    fn post_json(&self, url: &str, body: Vec<u8>) -> Result<(), RayError> {
33        let resp = self
34            .agent
35            .post(url)
36            .timeout(self.timeout)
37            .set("Content-Type", "application/json")
38            .send_bytes(&body);
39
40        match resp {
41            Ok(_) => Ok(()),
42            Err(ureq::Error::Status(status, resp)) => {
43                let body = resp.into_string().unwrap_or_default();
44                Err(RayError::HttpStatus { status, body })
45            }
46            Err(err) => Err(RayError::transport(err)),
47        }
48    }
49}
50
51#[cfg(all(not(feature = "transport-ureq"), feature = "transport-reqwest"))]
52#[derive(Debug, Clone)]
53struct ReqwestBlockingTransport {
54    client: reqwest::blocking::Client,
55}
56
57#[cfg(all(not(feature = "transport-ureq"), feature = "transport-reqwest"))]
58impl ReqwestBlockingTransport {
59    fn new(timeout: Duration) -> Self {
60        let client = reqwest::blocking::Client::builder()
61            .timeout(timeout)
62            .build()
63            .unwrap_or_else(|_| reqwest::blocking::Client::new());
64
65        Self { client }
66    }
67}
68
69#[cfg(all(not(feature = "transport-ureq"), feature = "transport-reqwest"))]
70impl Transport for ReqwestBlockingTransport {
71    fn post_json(&self, url: &str, body: Vec<u8>) -> Result<(), RayError> {
72        let resp = self
73            .client
74            .post(url)
75            .header(reqwest::header::CONTENT_TYPE, "application/json")
76            .body(body)
77            .send();
78
79        match resp {
80            Ok(resp) if resp.status().is_success() => Ok(()),
81            Ok(resp) => {
82                let status = resp.status().as_u16();
83                let body = resp.text().unwrap_or_default();
84                Err(RayError::HttpStatus { status, body })
85            }
86            Err(err) => Err(RayError::transport(err)),
87        }
88    }
89}
90
91#[cfg(all(not(feature = "transport-ureq"), not(feature = "transport-reqwest")))]
92#[derive(Debug, Clone, Default)]
93struct NoTransport;
94
95#[cfg(all(not(feature = "transport-ureq"), not(feature = "transport-reqwest")))]
96impl Transport for NoTransport {
97    fn post_json(&self, _url: &str, _body: Vec<u8>) -> Result<(), RayError> {
98        let err = std::io::Error::new(
99            std::io::ErrorKind::Other,
100            "no HTTP transport enabled (enable `transport-ureq` or `transport-reqwest`)",
101        );
102        Err(RayError::transport(err))
103    }
104}
105
106/// Sends serialized Ray requests over the configured transport.
107///
108/// ```rust
109/// use ray::{Client, RayConfig};
110///
111/// let config = RayConfig {
112///     enabled: false,
113///     ..Default::default()
114/// };
115/// let client = Client::new(config);
116/// assert!(!client.config().enabled);
117/// ```
118#[derive(Clone)]
119pub struct Client {
120    config: RayConfig,
121    meta: Arc<Meta>,
122    transport: Arc<dyn Transport>,
123    rate_limiter: Arc<Mutex<RateLimiterState>>,
124    cwd: PathBuf,
125
126    #[cfg(feature = "transport-reqwest")]
127    async_client: reqwest::Client,
128}
129
130impl std::fmt::Debug for Client {
131    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132        let mut s = f.debug_struct("Client");
133        s.field("config", &self.config);
134        s.field("meta", &self.meta);
135        s.field("cwd", &self.cwd);
136
137        #[cfg(feature = "transport-reqwest")]
138        {
139            s.field("async_client", &"reqwest::Client");
140        }
141
142        s.finish()
143    }
144}
145
146impl Client {
147    /// Create a new client from a `RayConfig`.
148    pub fn new(config: RayConfig) -> Self {
149        let meta = Arc::new(Meta::from_config(&config));
150        let timeout = Duration::from_millis(config.timeout_ms.get());
151        let rate_limiter = Arc::new(Mutex::new(RateLimiterState::default()));
152
153        #[cfg(feature = "transport-ureq")]
154        let transport: Arc<dyn Transport> = Arc::new(UreqTransport::new(timeout));
155
156        #[cfg(all(not(feature = "transport-ureq"), feature = "transport-reqwest"))]
157        let transport: Arc<dyn Transport> = Arc::new(ReqwestBlockingTransport::new(timeout));
158
159        #[cfg(all(not(feature = "transport-ureq"), not(feature = "transport-reqwest")))]
160        let transport: Arc<dyn Transport> = Arc::new(NoTransport::default());
161
162        let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
163
164        Self {
165            config,
166            meta,
167            transport,
168            rate_limiter,
169            cwd,
170            #[cfg(feature = "transport-reqwest")]
171            async_client: reqwest::Client::builder()
172                .timeout(timeout)
173                .build()
174                .unwrap_or_else(|_| reqwest::Client::new()),
175        }
176    }
177
178    #[cfg(test)]
179    pub(crate) fn new_with_transport(config: RayConfig, transport: Arc<dyn Transport>) -> Self {
180        let meta = Arc::new(Meta::from_config(&config));
181        let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
182        let rate_limiter = Arc::new(Mutex::new(RateLimiterState::default()));
183
184        #[cfg(feature = "transport-reqwest")]
185        let timeout = Duration::from_millis(config.timeout_ms.get());
186
187        Self {
188            config,
189            meta,
190            transport,
191            rate_limiter,
192            cwd,
193            #[cfg(feature = "transport-reqwest")]
194            async_client: reqwest::Client::builder()
195                .timeout(timeout)
196                .build()
197                .unwrap_or_else(|_| reqwest::Client::new()),
198        }
199    }
200
201    /// Return the shared default client.
202    pub fn global() -> Arc<Client> {
203        static CLIENT: OnceLock<Arc<Client>> = OnceLock::new();
204        CLIENT.get_or_init(|| Arc::new(Client::default())).clone()
205    }
206
207    /// Return the active configuration.
208    pub fn config(&self) -> &RayConfig {
209        &self.config
210    }
211
212    /// Return the client metadata payload.
213    pub fn meta(&self) -> &Arc<Meta> {
214        &self.meta
215    }
216
217    pub(crate) fn rate_limiter_handle(&self) -> RateLimiterHandle {
218        RateLimiterHandle::new(Arc::clone(&self.rate_limiter))
219    }
220
221    /// Return the client's working directory.
222    pub fn cwd(&self) -> &Path {
223        &self.cwd
224    }
225
226    /// Send a request to Ray when the client is enabled.
227    pub fn send_request(&self, request: &Request) -> Result<(), RayError> {
228        if !self.config.enabled {
229            return Ok(());
230        }
231
232        self.send_request_unchecked(request)
233    }
234
235    /// Alias for `send_request`.
236    pub fn send(&self, request: &Request) -> Result<(), RayError> {
237        self.send_request(request)
238    }
239
240    pub(crate) fn send_request_unchecked(&self, request: &Request) -> Result<(), RayError> {
241        let url = self.config.endpoint_url();
242        let body = serde_json::to_vec(request)?;
243
244        self.transport.post_json(&url, body)
245    }
246
247    #[cfg(feature = "transport-reqwest")]
248    /// Send a request to Ray asynchronously when the client is enabled.
249    pub async fn send_request_async(&self, request: &Request) -> Result<(), RayError> {
250        if !self.config.enabled {
251            return Ok(());
252        }
253
254        self.send_request_async_unchecked(request).await
255    }
256
257    #[cfg(feature = "transport-reqwest")]
258    pub(crate) async fn send_request_async_unchecked(
259        &self,
260        request: &Request,
261    ) -> Result<(), RayError> {
262        let url = self.config.endpoint_url();
263        let body = serde_json::to_vec(request)?;
264
265        let resp = self
266            .async_client
267            .post(url)
268            .header(reqwest::header::CONTENT_TYPE, "application/json")
269            .body(body)
270            .send()
271            .await;
272
273        match resp {
274            Ok(resp) if resp.status().is_success() => Ok(()),
275            Ok(resp) => {
276                let status = resp.status().as_u16();
277                let body = resp.text().await.unwrap_or_default();
278                Err(RayError::HttpStatus { status, body })
279            }
280            Err(err) => Err(RayError::transport(err)),
281        }
282    }
283}
284
285impl Default for Client {
286    fn default() -> Self {
287        Self::new(RayConfig::load())
288    }
289}