lightstreamer_rs/client/
builder.rs1use crate::client::{LightstreamerClient, Transport};
13use crate::subscription::{
14 ChannelSubscriptionListener, ItemUpdate, Snapshot, Subscription, SubscriptionMode,
15};
16use crate::utils::LightstreamerError;
17use std::sync::Arc;
18use tokio::sync::{Mutex, Notify, mpsc};
19
20#[derive(Debug, Clone)]
25pub struct ClientConfig {
26 pub server_address: String,
28 pub adapter_set: Option<String>,
30 pub username: Option<String>,
32 pub password: Option<String>,
34 pub transport: Option<Transport>,
36 pub keepalive_interval: Option<u64>,
38 pub idle_timeout: Option<u64>,
40 pub reconnect_timeout: Option<u64>,
42}
43
44impl ClientConfig {
45 #[must_use]
55 pub fn new(server_address: impl Into<String>) -> Self {
56 Self {
57 server_address: server_address.into(),
58 adapter_set: None,
59 username: None,
60 password: None,
61 transport: Some(Transport::WsStreaming),
62 keepalive_interval: None,
63 idle_timeout: None,
64 reconnect_timeout: None,
65 }
66 }
67
68 #[must_use]
70 pub fn adapter_set(mut self, adapter_set: impl Into<String>) -> Self {
71 self.adapter_set = Some(adapter_set.into());
72 self
73 }
74
75 #[must_use]
77 pub fn username(mut self, username: impl Into<String>) -> Self {
78 self.username = Some(username.into());
79 self
80 }
81
82 #[must_use]
84 pub fn password(mut self, password: impl Into<String>) -> Self {
85 self.password = Some(password.into());
86 self
87 }
88
89 #[must_use]
91 pub fn transport(mut self, transport: Transport) -> Self {
92 self.transport = Some(transport);
93 self
94 }
95
96 #[must_use]
98 pub fn keepalive_interval(mut self, interval: u64) -> Self {
99 self.keepalive_interval = Some(interval);
100 self
101 }
102
103 #[must_use]
105 pub fn idle_timeout(mut self, timeout: u64) -> Self {
106 self.idle_timeout = Some(timeout);
107 self
108 }
109
110 #[must_use]
112 pub fn reconnect_timeout(mut self, timeout: u64) -> Self {
113 self.reconnect_timeout = Some(timeout);
114 self
115 }
116}
117
118#[derive(Debug, Clone)]
120pub struct SubscriptionParams {
121 pub mode: SubscriptionMode,
123 pub items: Vec<String>,
125 pub fields: Vec<String>,
127 pub data_adapter: Option<String>,
129 pub snapshot: Option<Snapshot>,
131}
132
133impl SubscriptionParams {
134 #[must_use]
146 pub fn new(mode: SubscriptionMode, items: Vec<String>, fields: Vec<String>) -> Self {
147 Self {
148 mode,
149 items,
150 fields,
151 data_adapter: None,
152 snapshot: Some(Snapshot::Yes),
153 }
154 }
155
156 #[must_use]
158 pub fn data_adapter(mut self, adapter: impl Into<String>) -> Self {
159 self.data_adapter = Some(adapter.into());
160 self
161 }
162
163 #[must_use]
165 pub fn snapshot(mut self, snapshot: Snapshot) -> Self {
166 self.snapshot = Some(snapshot);
167 self
168 }
169}
170
171#[derive(Clone)]
175pub struct SimpleClient {
176 client: Arc<Mutex<LightstreamerClient>>,
177 shutdown_signal: Arc<Notify>,
178}
179
180impl SimpleClient {
181 pub fn new(config: ClientConfig) -> Result<Self, LightstreamerError> {
195 let mut client = LightstreamerClient::new(
196 Some(&config.server_address),
197 config.adapter_set.as_deref(),
198 config.username.as_deref(),
199 config.password.as_deref(),
200 )?;
201
202 if let Some(transport) = config.transport {
204 client
205 .connection_options
206 .set_forced_transport(Some(transport));
207 }
208
209 if let Some(interval) = config.keepalive_interval {
210 client
211 .connection_options
212 .set_keepalive_interval(interval)
213 .map_err(|e| format!("Failed to set keepalive interval: {}", e))?;
214 }
215
216 if let Some(timeout) = config.idle_timeout {
217 client
218 .connection_options
219 .set_idle_timeout(timeout)
220 .map_err(|e| format!("Failed to set idle timeout: {}", e))?;
221 }
222
223 if let Some(timeout) = config.reconnect_timeout {
224 client
225 .connection_options
226 .set_reconnect_timeout(timeout)
227 .map_err(|e| format!("Failed to set reconnect timeout: {}", e))?;
228 }
229
230 Ok(Self {
231 client: Arc::new(Mutex::new(client)),
232 shutdown_signal: Arc::new(Notify::new()),
233 })
234 }
235
236 pub async fn subscribe(
250 &self,
251 params: SubscriptionParams,
252 ) -> Result<mpsc::UnboundedReceiver<ItemUpdate>, LightstreamerError> {
253 let mut subscription =
254 Subscription::new(params.mode, Some(params.items), Some(params.fields))?;
255
256 if let Some(adapter) = params.data_adapter {
257 subscription.set_data_adapter(Some(adapter))?;
258 }
259
260 if let Some(snapshot) = params.snapshot {
261 subscription.set_requested_snapshot(Some(snapshot))?;
262 }
263
264 let (listener, receiver) = ChannelSubscriptionListener::create_channel();
266 subscription.add_listener(Box::new(listener));
267
268 let client_guard = self.client.lock().await;
270 LightstreamerClient::subscribe(client_guard.subscription_sender.clone(), subscription)
271 .await?;
272
273 Ok(receiver)
274 }
275
276 pub async fn connect(&self) -> Result<(), LightstreamerError> {
286 LightstreamerClient::connect(self.client.clone(), self.shutdown_signal.clone()).await
287 }
288
289 pub async fn disconnect(&self) {
291 let mut client_guard = self.client.lock().await;
292 client_guard.disconnect().await;
293 }
294
295 #[must_use]
299 pub fn shutdown_signal(&self) -> Arc<Notify> {
300 self.shutdown_signal.clone()
301 }
302
303 pub fn shutdown(&self) {
305 self.shutdown_signal.notify_waiters();
306 }
307}
308
309#[cfg(test)]
310mod tests {
311 use super::*;
312
313 #[test]
314 fn test_client_config_builder() {
315 let config = ClientConfig::new("http://localhost:8080")
316 .adapter_set("DEMO")
317 .username("user")
318 .password("pass")
319 .transport(Transport::WsStreaming)
320 .keepalive_interval(5000)
321 .idle_timeout(120000)
322 .reconnect_timeout(3000);
323
324 assert_eq!(config.server_address, "http://localhost:8080");
325 assert_eq!(config.adapter_set, Some("DEMO".to_string()));
326 assert_eq!(config.username, Some("user".to_string()));
327 assert_eq!(config.password, Some("pass".to_string()));
328 assert_eq!(config.transport, Some(Transport::WsStreaming));
329 assert_eq!(config.keepalive_interval, Some(5000));
330 assert_eq!(config.idle_timeout, Some(120000));
331 assert_eq!(config.reconnect_timeout, Some(3000));
332 }
333
334 #[test]
335 fn test_subscription_params_builder() {
336 let params = SubscriptionParams::new(
337 SubscriptionMode::Merge,
338 vec!["item1".to_string()],
339 vec!["field1".to_string()],
340 )
341 .data_adapter("QUOTE_ADAPTER")
342 .snapshot(Snapshot::Yes);
343
344 assert_eq!(params.mode, SubscriptionMode::Merge);
345 assert_eq!(params.items, vec!["item1".to_string()]);
346 assert_eq!(params.fields, vec!["field1".to_string()]);
347 assert_eq!(params.data_adapter, Some("QUOTE_ADAPTER".to_string()));
348 assert!(matches!(params.snapshot, Some(Snapshot::Yes)));
349 }
350}