Skip to main content

lightstreamer_rs/client/
builder.rs

1/******************************************************************************
2   Author: Joaquín Béjar García
3   Email: jb@taunais.com
4   Date: 25/10/25
5******************************************************************************/
6
7//! Simplified builder API for creating Lightstreamer clients.
8//!
9//! This module provides a high-level, ergonomic API for creating and configuring
10//! Lightstreamer clients with minimal boilerplate.
11
12use crate::client::{LightstreamerClient, Transport};
13use crate::subscription::{
14    ChannelSubscriptionListener, ItemUpdate, Snapshot, Subscription, SubscriptionMode,
15};
16use crate::utils::LightstreamerError;
17use std::sync::Arc;
18use tokio::sync::{Mutex, Notify, mpsc};
19
20/// Configuration for a Lightstreamer client.
21///
22/// This struct provides a simple way to configure all aspects of a Lightstreamer
23/// connection with sensible defaults.
24#[derive(Debug, Clone)]
25pub struct ClientConfig {
26    /// Server address (e.g., "<http://push.lightstreamer.com/lightstreamer>")
27    pub server_address: String,
28    /// Adapter set name (e.g., "DEMO")
29    pub adapter_set: Option<String>,
30    /// Username for authentication
31    pub username: Option<String>,
32    /// Password for authentication
33    pub password: Option<String>,
34    /// Transport type to use
35    pub transport: Option<Transport>,
36    /// Keepalive interval in milliseconds
37    pub keepalive_interval: Option<u64>,
38    /// Idle timeout in milliseconds
39    pub idle_timeout: Option<u64>,
40    /// Reconnect timeout in milliseconds
41    pub reconnect_timeout: Option<u64>,
42}
43
44impl ClientConfig {
45    /// Creates a new configuration with the specified server address.
46    ///
47    /// # Arguments
48    ///
49    /// * `server_address` - The Lightstreamer server URL
50    ///
51    /// # Returns
52    ///
53    /// A new `ClientConfig` with default values
54    #[must_use]
55    pub fn new(server_address: impl Into<String>) -> Self {
56        Self {
57            server_address: server_address.into(),
58            adapter_set: None,
59            username: None,
60            password: None,
61            transport: Some(Transport::WsStreaming),
62            keepalive_interval: None,
63            idle_timeout: None,
64            reconnect_timeout: None,
65        }
66    }
67
68    /// Sets the adapter set name.
69    #[must_use]
70    pub fn adapter_set(mut self, adapter_set: impl Into<String>) -> Self {
71        self.adapter_set = Some(adapter_set.into());
72        self
73    }
74
75    /// Sets the username for authentication.
76    #[must_use]
77    pub fn username(mut self, username: impl Into<String>) -> Self {
78        self.username = Some(username.into());
79        self
80    }
81
82    /// Sets the password for authentication.
83    #[must_use]
84    pub fn password(mut self, password: impl Into<String>) -> Self {
85        self.password = Some(password.into());
86        self
87    }
88
89    /// Sets the transport type.
90    #[must_use]
91    pub fn transport(mut self, transport: Transport) -> Self {
92        self.transport = Some(transport);
93        self
94    }
95
96    /// Sets the keepalive interval in milliseconds.
97    #[must_use]
98    pub fn keepalive_interval(mut self, interval: u64) -> Self {
99        self.keepalive_interval = Some(interval);
100        self
101    }
102
103    /// Sets the idle timeout in milliseconds.
104    #[must_use]
105    pub fn idle_timeout(mut self, timeout: u64) -> Self {
106        self.idle_timeout = Some(timeout);
107        self
108    }
109
110    /// Sets the reconnect timeout in milliseconds.
111    #[must_use]
112    pub fn reconnect_timeout(mut self, timeout: u64) -> Self {
113        self.reconnect_timeout = Some(timeout);
114        self
115    }
116}
117
118/// Subscription parameters for simplified subscription creation.
119#[derive(Debug, Clone)]
120pub struct SubscriptionParams {
121    /// Subscription mode
122    pub mode: SubscriptionMode,
123    /// Items to subscribe to
124    pub items: Vec<String>,
125    /// Fields to subscribe to
126    pub fields: Vec<String>,
127    /// Data adapter name
128    pub data_adapter: Option<String>,
129    /// Snapshot preference
130    pub snapshot: Option<Snapshot>,
131}
132
133impl SubscriptionParams {
134    /// Creates new subscription parameters.
135    ///
136    /// # Arguments
137    ///
138    /// * `mode` - The subscription mode (MERGE, DISTINCT, COMMAND, RAW)
139    /// * `items` - List of items to subscribe to
140    /// * `fields` - List of fields to retrieve
141    ///
142    /// # Returns
143    ///
144    /// A new `SubscriptionParams` instance
145    #[must_use]
146    pub fn new(mode: SubscriptionMode, items: Vec<String>, fields: Vec<String>) -> Self {
147        Self {
148            mode,
149            items,
150            fields,
151            data_adapter: None,
152            snapshot: Some(Snapshot::Yes),
153        }
154    }
155
156    /// Sets the data adapter.
157    #[must_use]
158    pub fn data_adapter(mut self, adapter: impl Into<String>) -> Self {
159        self.data_adapter = Some(adapter.into());
160        self
161    }
162
163    /// Sets the snapshot preference.
164    #[must_use]
165    pub fn snapshot(mut self, snapshot: Snapshot) -> Self {
166        self.snapshot = Some(snapshot);
167        self
168    }
169}
170
171/// Simplified Lightstreamer client with high-level API.
172///
173/// This provides an easy-to-use interface for common Lightstreamer operations.
174#[derive(Clone)]
175pub struct SimpleClient {
176    client: Arc<Mutex<LightstreamerClient>>,
177    shutdown_signal: Arc<Notify>,
178}
179
180impl SimpleClient {
181    /// Creates a new simple client with the given configuration.
182    ///
183    /// # Arguments
184    ///
185    /// * `config` - Client configuration
186    ///
187    /// # Returns
188    ///
189    /// A new `SimpleClient` instance or an error
190    ///
191    /// # Errors
192    ///
193    /// Returns an error if the client cannot be created
194    pub fn new(config: ClientConfig) -> Result<Self, LightstreamerError> {
195        let mut client = LightstreamerClient::new(
196            Some(&config.server_address),
197            config.adapter_set.as_deref(),
198            config.username.as_deref(),
199            config.password.as_deref(),
200        )?;
201
202        // Apply configuration
203        if let Some(transport) = config.transport {
204            client
205                .connection_options
206                .set_forced_transport(Some(transport));
207        }
208
209        if let Some(interval) = config.keepalive_interval {
210            client
211                .connection_options
212                .set_keepalive_interval(interval)
213                .map_err(|e| format!("Failed to set keepalive interval: {}", e))?;
214        }
215
216        if let Some(timeout) = config.idle_timeout {
217            client
218                .connection_options
219                .set_idle_timeout(timeout)
220                .map_err(|e| format!("Failed to set idle timeout: {}", e))?;
221        }
222
223        if let Some(timeout) = config.reconnect_timeout {
224            client
225                .connection_options
226                .set_reconnect_timeout(timeout)
227                .map_err(|e| format!("Failed to set reconnect timeout: {}", e))?;
228        }
229
230        Ok(Self {
231            client: Arc::new(Mutex::new(client)),
232            shutdown_signal: Arc::new(Notify::new()),
233        })
234    }
235
236    /// Subscribes to items and returns a channel receiver for updates.
237    ///
238    /// # Arguments
239    ///
240    /// * `params` - Subscription parameters
241    ///
242    /// # Returns
243    ///
244    /// A receiver for `ItemUpdate` events
245    ///
246    /// # Errors
247    ///
248    /// Returns an error if the subscription cannot be created
249    pub async fn subscribe(
250        &self,
251        params: SubscriptionParams,
252    ) -> Result<mpsc::UnboundedReceiver<ItemUpdate>, LightstreamerError> {
253        let mut subscription =
254            Subscription::new(params.mode, Some(params.items), Some(params.fields))?;
255
256        if let Some(adapter) = params.data_adapter {
257            subscription.set_data_adapter(Some(adapter))?;
258        }
259
260        if let Some(snapshot) = params.snapshot {
261            subscription.set_requested_snapshot(Some(snapshot))?;
262        }
263
264        // Create channel listener
265        let (listener, receiver) = ChannelSubscriptionListener::create_channel();
266        subscription.add_listener(Box::new(listener));
267
268        // Add subscription to client
269        let client_guard = self.client.lock().await;
270        LightstreamerClient::subscribe(client_guard.subscription_sender.clone(), subscription)
271            .await?;
272
273        Ok(receiver)
274    }
275
276    /// Connects to the Lightstreamer server.
277    ///
278    /// # Returns
279    ///
280    /// A future that resolves when the connection is established
281    ///
282    /// # Errors
283    ///
284    /// Returns an error if the connection fails
285    pub async fn connect(&self) -> Result<(), LightstreamerError> {
286        LightstreamerClient::connect(self.client.clone(), self.shutdown_signal.clone()).await
287    }
288
289    /// Disconnects from the Lightstreamer server.
290    pub async fn disconnect(&self) {
291        let mut client_guard = self.client.lock().await;
292        client_guard.disconnect().await;
293    }
294
295    /// Gets a clone of the shutdown signal.
296    ///
297    /// This can be used to trigger shutdown from external code.
298    #[must_use]
299    pub fn shutdown_signal(&self) -> Arc<Notify> {
300        self.shutdown_signal.clone()
301    }
302
303    /// Triggers a shutdown of the client.
304    pub fn shutdown(&self) {
305        self.shutdown_signal.notify_waiters();
306    }
307}
308
309#[cfg(test)]
310mod tests {
311    use super::*;
312
313    #[test]
314    fn test_client_config_builder() {
315        let config = ClientConfig::new("http://localhost:8080")
316            .adapter_set("DEMO")
317            .username("user")
318            .password("pass")
319            .transport(Transport::WsStreaming)
320            .keepalive_interval(5000)
321            .idle_timeout(120000)
322            .reconnect_timeout(3000);
323
324        assert_eq!(config.server_address, "http://localhost:8080");
325        assert_eq!(config.adapter_set, Some("DEMO".to_string()));
326        assert_eq!(config.username, Some("user".to_string()));
327        assert_eq!(config.password, Some("pass".to_string()));
328        assert_eq!(config.transport, Some(Transport::WsStreaming));
329        assert_eq!(config.keepalive_interval, Some(5000));
330        assert_eq!(config.idle_timeout, Some(120000));
331        assert_eq!(config.reconnect_timeout, Some(3000));
332    }
333
334    #[test]
335    fn test_subscription_params_builder() {
336        let params = SubscriptionParams::new(
337            SubscriptionMode::Merge,
338            vec!["item1".to_string()],
339            vec!["field1".to_string()],
340        )
341        .data_adapter("QUOTE_ADAPTER")
342        .snapshot(Snapshot::Yes);
343
344        assert_eq!(params.mode, SubscriptionMode::Merge);
345        assert_eq!(params.items, vec!["item1".to_string()]);
346        assert_eq!(params.fields, vec!["field1".to_string()]);
347        assert_eq!(params.data_adapter, Some("QUOTE_ADAPTER".to_string()));
348        assert!(matches!(params.snapshot, Some(Snapshot::Yes)));
349    }
350}