Skip to main content

lightstreamer_rs/client/
implementation.rs

1use crate::subscription::{ItemUpdate, Snapshot, Subscription, SubscriptionMode};
2
3use crate::client::Transport;
4pub(crate) use crate::client::listener::ClientListener;
5use crate::client::message_listener::ClientMessageListener;
6use crate::client::model::{ClientStatus, DisconnectionType, LogType};
7use crate::client::request::SubscriptionRequest;
8use crate::client::utils::get_subscription_by_id;
9use crate::connection::{ConnectionDetails, ConnectionOptions};
10use crate::connection::{ConnectionManager, ConnectionState, HeartbeatConfig, ReconnectionConfig};
11use crate::utils::{LightstreamerError, clean_message, parse_arguments};
12use cookie::Cookie;
13use futures_util::{SinkExt, StreamExt};
14use std::collections::HashMap;
15use std::fmt::{self, Debug, Formatter};
16use std::sync::Arc;
17use std::time::Duration;
18use tokio::sync::mpsc::channel;
19use tokio::sync::{
20    Mutex, Notify,
21    mpsc::{Receiver, Sender},
22};
23use tokio::time::Instant as TokioInstant;
24use tokio_tungstenite::{
25    connect_async,
26    tungstenite::{
27        Message,
28        http::{HeaderName, HeaderValue, Request},
29    },
30};
31use tracing::{Level, debug, error, info, instrument, trace, warn};
32use url::Url;
33
34/// Facade class for the management of the communication to Lightstreamer Server. Used to provide
35/// configuration settings, event handlers, operations for the control of the connection lifecycle,
36/// Subscription handling and to send messages.
37///
38/// An instance of `LightstreamerClient` handles the communication with Lightstreamer Server on a
39/// specified endpoint. Hence, it hosts one "Session"; or, more precisely, a sequence of Sessions,
40/// since any Session may fail and be recovered, or it can be interrupted on purpose. So, normally,
41/// a single instance of `LightstreamerClient` is needed.
42///
43/// However, multiple instances of `LightstreamerClient` can be used, toward the same or multiple
44/// endpoints.
45///
46/// You can listen to the events generated by a session by registering an event listener, such as
47/// `ClientListener` or `SubscriptionListener`. These listeners allow you to handle various events,
48/// such as session creation, connection status, subscription updates, and server messages. However,
49/// you should be aware that the event notifications are dispatched by a single thread, the so-called
50/// event thread. This means that if the operations of a listener are slow or blocking, they will
51/// delay the processing of the other listeners and affect the performance of your application.
52/// Therefore, you should delegate any slow or blocking operations to a dedicated thread, and keep
53/// the listener methods as fast and simple as possible. Note that even if you create multiple
54/// instances of `LightstreamerClient`, they will all use a single event thread, that is shared
55/// among them.
56///
57/// # Parameters
58///
59/// * `server_address`: the address of the Lightstreamer Server to which this `LightstreamerClient`
60///   will connect to. It is possible to specify it later by using `None` here. See
61///   `ConnectionDetails.setServerAddress()` for details.
62/// * `adapter_set`: the name of the Adapter Set mounted on Lightstreamer Server to be used to handle
63///   all requests in the Session associated with this `LightstreamerClient`. It is possible not to
64///   specify it at all or to specify it later by using `None` here. See `ConnectionDetails.setAdapterSet()`
65///   for details.
66///
67/// # Raises
68///
69/// * `IllegalArgumentException`: if a not valid address is passed. See `ConnectionDetails.setServerAddress()`
70///   for details.
71pub struct LightstreamerClient {
72    /// The address of the Lightstreamer Server to which this `LightstreamerClient` will connect.
73    server_address: Option<String>,
74    /// The name of the Adapter Set mounted on Lightstreamer Server to be used to handle all
75    /// requests in the Session associated with this `LightstreamerClient`.
76    adapter_set: Option<String>,
77    /// Data object that contains the details needed to open a connection to a Lightstreamer Server.
78    /// This instance is set up by the `LightstreamerClient` object at its own creation. Properties
79    /// of this object can be overwritten by values received from a Lightstreamer Server.
80    pub connection_details: ConnectionDetails,
81    /// Data object that contains options and policies for the connection to the server. This instance
82    /// is set up by the `LightstreamerClient` object at its own creation. Properties of this object
83    /// can be overwritten by values received from a Lightstreamer Server.
84    pub connection_options: ConnectionOptions,
85    /// A list of listeners that will receive events from the `LightstreamerClient` instance.
86    listeners: Vec<Box<dyn ClientListener>>,
87    /// A list containing all the `Subscription` instances that are currently "active" on this
88    /// `LightstreamerClient`.
89    subscriptions: Vec<Subscription>,
90    /// The current status of the client.
91    status: ClientStatus,
92    /// Logging Type to be used
93    logging: LogType,
94    /// The sender that can be used to subscribe/unsubscribe
95    pub subscription_sender: Sender<SubscriptionRequest>,
96    /// The receiver used for subscribe/unsubsribe
97    subscription_receiver: Receiver<SubscriptionRequest>,
98    /// Connection manager for automatic reconnection
99    connection_manager: Option<Arc<ConnectionManager>>,
100    /// Configuration for reconnection behavior
101    reconnection_config: ReconnectionConfig,
102    /// Configuration for heartbeat monitoring
103    heartbeat_config: HeartbeatConfig,
104    /// Whether automatic reconnection is enabled
105    auto_reconnect_enabled: bool,
106}
107
108impl Debug for LightstreamerClient {
109    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
110        f.debug_struct("LightstreamerClient")
111            .field("server_address", &self.server_address)
112            .field("adapter_set", &self.adapter_set)
113            .field("connection_details", &self.connection_details)
114            .field("connection_options", &self.connection_options)
115            .field("listeners", &self.listeners)
116            .field("subscriptions", &self.subscriptions)
117            .finish()
118    }
119}
120
121impl LightstreamerClient {
122    /// A constant string representing the name of the library.
123    pub const LIB_NAME: &'static str = "rust_client";
124
125    /// A constant string representing the version of the library.
126    pub const LIB_VERSION: &'static str = "0.1.0";
127
128    //
129    // Constants for WebSocket connection.
130    //
131    /// WebSocket key used in the WebSocket handshake process.
132    /// This is a base64-encoded value that is used to establish the WebSocket connection.
133    pub const SEC_WEBSOCKET_KEY: &'static str = "PNDUibe9ex7PnsrLbt0N4w==";
134
135    /// WebSocket protocol identifier for the TLCP protocol used by Lightstreamer.
136    /// This identifies the specific subprotocol used over the WebSocket connection.
137    pub const SEC_WEBSOCKET_PROTOCOL: &'static str = "TLCP-2.4.0.lightstreamer.com";
138
139    /// WebSocket version used for the connection.
140    /// Version 13 is the standard version used in modern WebSocket implementations.
141    pub const SEC_WEBSOCKET_VERSION: &'static str = "13";
142
143    /// WebSocket upgrade header value.
144    /// Used to indicate that the client wishes to upgrade from HTTP to WebSocket protocol.
145    pub const SEC_WEBSOCKET_UPGRADE: &'static str = "websocket";
146
147    /// A constant string representing the version of the TLCP protocol used by the library.
148    pub const TLCP_VERSION: &'static str = "TLCP-2.4.0";
149
150    /// Grace period (milliseconds) added on top of the keepalive interval before
151    /// the connection is considered dead by the liveness watchdog.
152    ///
153    /// The server guarantees a message (data or `probe`) at least every keepalive
154    /// interval; the grace absorbs network jitter and scheduling delays.
155    pub const KEEPALIVE_GRACE_MS: u64 = 5_000;
156
157    /// Static method that can be used to share cookies between connections to the Server (performed by
158    /// this library) and connections to other sites that are performed by the application. With this
159    /// method, cookies received by the application can be added (or replaced if already present) to
160    /// the cookie set used by the library to access the Server. Obviously, only cookies whose domain
161    /// is compatible with the Server domain will be used internally.
162    ///
163    /// This method should be invoked before calling the `LightstreamerClient.connect()` method.
164    /// However it can be invoked at any time; it will affect the internal cookie set immediately
165    /// and the sending of cookies on the next HTTP request or WebSocket establishment.
166    ///
167    /// # Parameters
168    ///
169    /// * `uri`: the URI from which the supplied cookies were received. It cannot be `None`.
170    /// * `cookies`: an instance of `http.cookies.SimpleCookie`.
171    ///
172    /// See also `getCookies()`
173    pub fn add_cookies(_uri: &str, _cookies: &Cookie) {
174        // Implementation for add_cookies
175        unimplemented!("Implement mechanism to add cookies to LightstreamerClient");
176    }
177
178    /// Adds a listener that will receive events from the `LightstreamerClient` instance.
179    ///
180    /// The same listener can be added to several different `LightstreamerClient` instances.
181    ///
182    /// A listener can be added at any time. A call to add a listener already present will be ignored.
183    ///
184    /// # Parameters
185    ///
186    /// * `listener`: An object that will receive the events as documented in the `ClientListener`
187    ///   interface.
188    ///
189    /// See also `removeListener()`
190    pub fn add_listener(&mut self, listener: Box<dyn ClientListener>) {
191        self.listeners.push(listener);
192    }
193
194    /// Packs s string with the necessary parameters for a subscription request.
195    ///
196    /// # Parameters
197    ///
198    /// * `subscription`: The subscription for which to get the parameters.
199    /// * `request_id`: The request ID to use in the parameters.
200    ///
201    fn get_subscription_params(
202        subscription: &Subscription,
203        request_id: usize,
204    ) -> Result<String, LightstreamerError> {
205        let ls_req_id = request_id.to_string();
206        let ls_sub_id = subscription.id.to_string();
207        let ls_mode = subscription.get_mode().to_string();
208        let ls_group = match subscription.get_item_group() {
209            Some(item_group) => item_group.to_string(),
210            None => match subscription.get_items() {
211                Some(items) => items.join(" "),
212                None => {
213                    return Err(LightstreamerError::invalid_argument(
214                        "No item group or items found in subscription.",
215                    ));
216                }
217            },
218        };
219        let ls_schema = match subscription.get_field_schema() {
220            Some(field_schema) => field_schema.to_string(),
221            None => match subscription.get_fields() {
222                Some(fields) => fields.join(" "),
223                None => {
224                    return Err(LightstreamerError::invalid_argument(
225                        "No field schema or fields found in subscription.",
226                    ));
227                }
228            },
229        };
230        let ls_data_adapter = match subscription.get_data_adapter() {
231            Some(data_adapter) => data_adapter.to_string(),
232            None => "".to_string(),
233        };
234        let ls_snapshot = subscription
235            .get_requested_snapshot()
236            .unwrap_or_default()
237            .to_string();
238        //
239        // Prepare the subscription request.
240        //
241        let mut params: Vec<(&str, &str)> = vec![
242            ("LS_data_adapter", &ls_data_adapter),
243            ("LS_reqId", &ls_req_id),
244            ("LS_op", "add"),
245            ("LS_subId", &ls_sub_id),
246            ("LS_mode", &ls_mode),
247            ("LS_group", &ls_group),
248            ("LS_schema", &ls_schema),
249            ("LS_ack", "false"),
250        ];
251        // Remove the data adapter parameter if not specified.
252        if ls_data_adapter.is_empty() {
253            params.remove(0);
254        }
255        if !ls_snapshot.is_empty() {
256            params.push(("LS_snapshot", &ls_snapshot));
257        }
258
259        Ok(serde_urlencoded::to_string(&params)?)
260    }
261
262    fn get_unsubscription_params(
263        subscription_id: usize,
264        request_id: usize,
265    ) -> Result<String, LightstreamerError> {
266        let ls_req_id = request_id.to_string();
267        let ls_sub_id = subscription_id.to_string();
268        //
269        // Prepare the unsubscription request.
270        //
271        let params: Vec<(&str, &str)> = vec![
272            ("LS_reqId", &ls_req_id),
273            ("LS_op", "delete"),
274            ("LS_subId", &ls_sub_id),
275        ];
276
277        Ok(serde_urlencoded::to_string(&params)?)
278    }
279
280    /// Operation method that requests to open a Session against the configured Lightstreamer Server.
281    ///
282    /// When `connect()` is called, unless a single transport was forced through `ConnectionOptions.setForcedTransport()`,
283    /// the so called "Stream-Sense" mechanism is started: if the client does not receive any answer
284    /// for some seconds from the streaming connection, then it will automatically open a polling
285    /// connection.
286    ///
287    /// A polling connection may also be opened if the environment is not suitable for a streaming
288    /// connection.
289    ///
290    /// Note that as "polling connection" we mean a loop of polling requests, each of which requires
291    /// opening a synchronous (i.e. not streaming) connection to Lightstreamer Server.
292    ///
293    /// Note that the request to connect is accomplished by the client in a separate thread; this
294    /// means that an invocation to `getStatus()` right after `connect()` might not reflect the change
295    /// yet.
296    ///
297    /// When the request to connect is finally being executed, if the current status of the client
298    /// is not `DISCONNECTED`, then nothing will be done.
299    ///
300    /// # Raises
301    ///
302    /// * `IllegalStateException`: if no server address was configured.
303    ///
304    /// See also `getStatus()`
305    ///
306    /// See also `disconnect()`
307    ///
308    /// See also `ClientListener.onStatusChange()`
309    ///
310    /// See also `ConnectionDetails.setServerAddress()`
311    #[instrument(level = "trace")]
312    pub async fn connect(
313        client: Arc<Mutex<Self>>,
314        shutdown_signal: Arc<Notify>,
315    ) -> Result<(), LightstreamerError> {
316        // If auto-reconnect is enabled, use ConnectionManager
317        let auto_reconnect_enabled = {
318            let client_guard = client.lock().await;
319            client_guard.auto_reconnect_enabled
320        };
321
322        if auto_reconnect_enabled {
323            let (reconnection_config, heartbeat_config) = {
324                let client_guard = client.lock().await;
325                (
326                    client_guard.reconnection_config.clone(),
327                    client_guard.heartbeat_config.clone(),
328                )
329            };
330
331            let weak_client = Arc::downgrade(&client);
332            let connection_manager: Arc<ConnectionManager> =
333                ConnectionManager::new(weak_client, reconnection_config, heartbeat_config);
334
335            {
336                let mut client_guard = client.lock().await;
337                client_guard.connection_manager = Some(connection_manager.clone());
338            }
339
340            // Use direct connection method
341            return client.lock().await.connect_direct(shutdown_signal).await;
342        }
343
344        // Fallback to direct connection without auto-reconnect
345        let mut client_guard = client.lock().await;
346        client_guard.connect_direct(shutdown_signal).await
347    }
348
349    /// Direct connection method without automatic reconnection
350    #[instrument(level = "trace")]
351    pub async fn connect_direct(
352        &mut self,
353        shutdown_signal: Arc<Notify>,
354    ) -> Result<(), LightstreamerError> {
355        // Check if the server address is configured.
356        if self.server_address.is_none() {
357            return Err(LightstreamerError::invalid_state(
358                "No server address was configured.",
359            ));
360        }
361        //
362        // Only WebSocket streaming transport is currently supported.
363        //
364        let forced_transport = self.connection_options.get_forced_transport();
365        if forced_transport.is_none_or(|t| *t != Transport::WsStreaming) {
366            return Err(LightstreamerError::invalid_state(
367                "Only WebSocket streaming transport is currently supported.",
368            ));
369        }
370        //
371        // Convert the HTTP URL to a WebSocket URL.
372        //
373        let http_url = self
374            .connection_details
375            .get_server_address()
376            .ok_or_else(|| LightstreamerError::invalid_state("Server address is not set."))?;
377        let mut url = Url::parse(http_url).map_err(|e| {
378            LightstreamerError::invalid_state(format!("Failed to parse server address URL: {}", e))
379        })?;
380        match url.scheme() {
381            "http" => url.set_scheme("ws").map_err(|()| {
382                LightstreamerError::invalid_state("Failed to set scheme to ws for WebSocket URL.")
383            })?,
384            "https" => url.set_scheme("wss").map_err(|()| {
385                LightstreamerError::invalid_state("Failed to set scheme to wss for WebSocket URL.")
386            })?,
387            invalid_scheme => {
388                return Err(LightstreamerError::invalid_state(format!(
389                    "Unsupported scheme '{}' found when converting HTTP URL to WebSocket URL.",
390                    invalid_scheme
391                )));
392            }
393        }
394        let ws_url = url.as_str();
395
396        // Build the WebSocket request with the necessary headers.
397        let request = Request::builder()
398            .uri(ws_url)
399            .header(
400                HeaderName::from_static("connection"),
401                HeaderValue::from_static("Upgrade"),
402            )
403            .header(
404                HeaderName::from_static("host"),
405                HeaderValue::from_str(url.host_str().unwrap_or("localhost")).map_err(|err| {
406                    LightstreamerError::invalid_state(format!(
407                        "Invalid header value for header with name 'host': {}",
408                        err
409                    ))
410                })?,
411            )
412            .header(
413                HeaderName::from_static("sec-websocket-key"),
414                HeaderValue::from_static(Self::SEC_WEBSOCKET_KEY),
415            )
416            .header(
417                HeaderName::from_static("sec-websocket-protocol"),
418                HeaderValue::from_static(Self::SEC_WEBSOCKET_PROTOCOL),
419            )
420            .header(
421                HeaderName::from_static("sec-websocket-version"),
422                HeaderValue::from_static(Self::SEC_WEBSOCKET_VERSION),
423            )
424            .header(
425                HeaderName::from_static("upgrade"),
426                HeaderValue::from_static(Self::SEC_WEBSOCKET_UPGRADE),
427            )
428            .body(())?;
429
430        // Connect to the Lightstreamer server using WebSocket.
431        let ws_stream = match connect_async(request).await {
432            Ok((ws_stream, response)) => {
433                if let Some(server_header) = response.headers().get("server") {
434                    self.make_log(
435                        Level::INFO,
436                        &format!(
437                            "Connected to Lightstreamer server: {}",
438                            server_header.to_str().unwrap_or("")
439                        ),
440                    );
441                } else {
442                    self.make_log(Level::INFO, "Connected to Lightstreamer server");
443                }
444                ws_stream
445            }
446            Err(err) => {
447                return Err(LightstreamerError::connection(format!(
448                    "Failed to connect to Lightstreamer server with WebSocket: {}",
449                    err
450                )));
451            }
452        };
453
454        // Split the WebSocket stream into a write and a read stream.
455        let (mut write_stream, mut read_stream) = ws_stream.split();
456
457        //
458        // Initiate communication with the server by sending a 'wsok' message.
459        //
460        write_stream.send(Message::Text("wsok".into())).await?;
461
462        //
463        // Start reading and processing messages from the server.
464        //
465        let mut is_connected = false;
466        let mut request_id: usize = 0;
467        let mut session_id: Option<String> = None;
468        let mut subscription_id: usize = 0;
469        let mut subscription_item_updates: HashMap<usize, HashMap<usize, ItemUpdate>> =
470            HashMap::new();
471
472        //
473        // Liveness enforcement and reverse heartbeat.
474        //
475        // The server guarantees that some message (data or `probe`) is sent at
476        // least every keepalive interval. If nothing arrives within the
477        // effective keepalive window plus a grace period, the connection is
478        // considered dead (half-open TCP) and an error is returned so the
479        // caller can reconnect. The window starts from the configured
480        // keepalive interval (if any) and is widened to the server-declared
481        // keepalive from the `conok` message once known.
482        //
483        let configured_keepalive_ms = self.connection_options.get_keepalive_interval();
484        let reverse_heartbeat_ms = self.connection_options.get_reverse_heartbeat_interval();
485        let mut keepalive_timeout_ms: u64 = configured_keepalive_ms;
486        let mut last_message_at = TokioInstant::now();
487
488        let mut heartbeat_timer = if reverse_heartbeat_ms > 0 {
489            let mut timer = tokio::time::interval(Duration::from_millis(reverse_heartbeat_ms));
490            timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
491            Some(timer)
492        } else {
493            None
494        };
495
496        loop {
497            let liveness_deadline = last_message_at
498                + Duration::from_millis(
499                    keepalive_timeout_ms.saturating_add(Self::KEEPALIVE_GRACE_MS),
500                );
501            tokio::select! {
502                message = read_stream.next() => {
503                    last_message_at = TokioInstant::now();
504                    match message {
505                        Some(Ok(Message::Text(text))) => {
506                            // Messages could include multiple submessages separated by /r/n.
507                            // Split the message into submessages and process each one separately.
508                            let submessages: Vec<&str> = text.split("\r\n")
509                                .filter(|&line| !line.trim().is_empty()) // Filter out empty lines.
510                                .collect();
511                            for submessage in submessages {
512                                let clean_text = clean_message(submessage);
513                                let submessage_fields: Vec<&str> = clean_text.split(",").collect();
514                                match *submessage_fields.first().unwrap_or(&"") {
515                                    //
516                                    // Errors from server.
517                                    //
518                                    //
519                                    // Session-level error: the session is not usable — fail the
520                                    // connection so the caller can reconnect. (The previous
521                                    // `break` only exited the submessage loop, leaving a dead
522                                    // session looping forever.)
523                                    //
524                                    "conerr" => {
525                                        self.make_log( Level::ERROR, &format!("Received connection error from Lightstreamer server: {}", clean_text) );
526                                        return Err(LightstreamerError::connection(format!(
527                                            "connection error from server: {}",
528                                            clean_text
529                                        )));
530                                    },
531                                    //
532                                    // Request-level error: the session survives — log and continue.
533                                    //
534                                    "reqerr" => {
535                                        self.make_log( Level::ERROR, &format!("Received request error from Lightstreamer server: {}", clean_text) );
536                                    },
537                                    //
538                                    // Session created successfully.
539                                    //
540                                    "conok" => {
541                                        is_connected = true;
542                                        // CONOK,<session-ID>,<request-limit>,<keep-alive>,<control-link>
543                                        // Widen the liveness window to the server-declared keepalive
544                                        // so enforcement works even when no keepalive was configured.
545                                        if let Some(server_keepalive_ms) = submessage_fields
546                                            .get(3)
547                                            .and_then(|v| v.trim().parse::<u64>().ok())
548                                            .filter(|v| *v > 0)
549                                        {
550                                            keepalive_timeout_ms = widen_keepalive_timeout(
551                                                keepalive_timeout_ms,
552                                                server_keepalive_ms,
553                                            );
554                                            self.make_log( Level::DEBUG, &format!("Server-declared keepalive interval: {} ms", server_keepalive_ms) );
555                                        }
556                                        if let Some(conok_session_id) = submessage_fields.get(1) {
557                                            session_id = Some((*conok_session_id).to_string());
558                                            self.make_log( Level::DEBUG, &format!("Session creation confirmed by server: {}", clean_text) );
559                                            self.make_log( Level::DEBUG, &format!("Session created with ID: {:?}", conok_session_id) );
560                                            //
561                                            // Subscribe to the desired items.
562                                            //
563                                            while let Some(subscription) = self.subscriptions.get_mut(subscription_id) {
564                                                //
565                                                // Gather all the necessary subscription parameters.
566                                                //
567                                                subscription_id += 1;
568                                                request_id += 1;
569                                                subscription.id = subscription_id;
570                                                subscription.id_sender.try_send(subscription_id)?;
571
572                                                let encoded_params = match Self::get_subscription_params(subscription, request_id)
573                                                {
574                                                    Ok(params) => params,
575                                                    Err(err) => {
576                                                        return Err(err);
577                                                    },
578                                                };
579
580                                                write_stream
581                                                    .send(Message::Text(format!("control\r\n{}", encoded_params).into()))
582                                                    .await?;
583                                                debug!("Sent subscription request: '{}'", encoded_params);
584                                            }
585                                        } else {
586                                            return Err(LightstreamerError::protocol(
587                                                "Session ID not found in 'conok' message from server",
588                                            ));
589                                        }
590                                    },
591                                    //
592                                    // Notifications from server.
593                                    //
594                                    "conf" | "cons" | "clientip" | "servname" | "prog" | "sync" | "eos" => {
595                                        self.make_log( Level::INFO, &format!("Received notification from server: {}", clean_text) );
596                                        // Don't do anything with these notifications for now.
597                                    },
598                                    "probe" => {
599                                        self.make_log( Level::DEBUG, &format!("Received probe message from server: {}", clean_text ) );
600                                    },
601                                    "reqok" => {
602                                        self.make_log( Level::DEBUG, &format!("Received reqok message from server: '{}'", clean_text ) );
603                                    },
604                                    //
605                                    // Subscription confirmation from server.
606                                    //
607                                    "subok" => {
608                                        self.make_log( Level::INFO, &format!("Subscription confirmed by server: '{}'", clean_text) );
609                                    },
610                                    //
611                                    // Usubscription confirmation from server.
612                                    //
613                                    "unsub" => {
614                                        self.make_log( Level::INFO, &format!("Unsubscription confirmed by server: '{}'", clean_text) );
615                                    },
616                                    //
617                                    // Data updates from server.
618                                    //
619                                    "u" => {
620                                        // Parse arguments from the received message.
621                                        let arguments = parse_arguments(&clean_text);
622                                        //
623                                        // Extract the subscription from the first argument.
624                                        //
625                                        let subscription_index = arguments.get(1).unwrap_or(&"").parse::<usize>().unwrap_or(0);
626                                        let subscription = match get_subscription_by_id(self.get_subscriptions(), subscription_index) {
627                                            Some(subscription) => subscription,
628                                            None => {
629                                                self.make_log( Level::WARN, &format!("Subscription not found for index: {}", subscription_index) );
630                                                continue;
631
632                                            }
633                                        };
634                                        //
635                                        // Extract the item from the second argument.
636                                        //
637                                        let item_index = arguments.get(2).unwrap_or(&"").parse::<usize>().unwrap_or(0);
638                                        let item = match subscription.get_items() {
639                                            Some(items) => items.get(item_index-1),
640                                            None => {
641                                                self.make_log( Level::WARN, &format!("No items found in subscription: {:?}", subscription) );
642                                                continue;
643                                            }
644                                        };
645                                        //
646                                        // Determine if the update is a snapshot or real-time update based on the subscription parameters.
647                                        //
648                                        let is_snapshot = match subscription.get_requested_snapshot() {
649                                            Some(ls_snapshot) => {
650                                                match ls_snapshot {
651                                                    Snapshot::No => false,
652                                                    Snapshot::Yes => {
653                                                        match subscription.get_mode() {
654                                                            SubscriptionMode::Merge => {
655                                                                if arguments.len() == 4 && arguments[3] == "$" {
656                                                                    // EOS notification received
657                                                                    true
658                                                                } else {
659                                                                    // If item doesn't exist in item_updates yet, the first update
660                                                                    // is always a snapshot.
661                                                                    if let Some(item_updates) = subscription_item_updates.get(&(subscription_index)) {
662                                                                        if item_updates.get(&(item_index)).is_some() {
663                                                                            // Item update already exists in item_updates, so it's not a snapshot.
664                                                                            false
665                                                                        } else {
666                                                                            // Item update doesn't exist in item_updates, so the first update is always a snapshot.
667                                                                            true
668                                                                        }
669                                                                    } else {
670                                                                        // Item updates not found for subscription, so the first update is always a snapshot.
671                                                                        true
672                                                                    }
673                                                                }
674                                                            },
675                                                            SubscriptionMode::Distinct | SubscriptionMode::Command => {
676                                                                !subscription.is_subscribed()
677                                                            },
678                                                            _ => false,
679                                                        }
680                                                    },
681                                                    _ => false,
682                                                }
683                                            },
684                                            None => false,
685                                        };
686
687                                        // Extract the field values from the third argument.
688                                        let field_values: Vec<&str> = arguments.get(3).unwrap_or(&"").split('|').collect();
689
690                                        //
691                                        // Get fields from subscription and create a HashMap of field names and values.
692                                        //
693                                        let subscription_fields = subscription.get_fields();
694                                        let mut field_map: HashMap<String, Option<String>> = subscription_fields
695                                            .map(|fields| fields.iter().map(|field_name| (field_name.to_string(), None)).collect())
696                                            .unwrap_or_default();
697
698                                        let mut field_index = 0;
699                                        for value in field_values {
700                                            match value {
701                                                "" => {
702                                                    // An empty value means the field is unchanged compared to the previous update of the same field.
703                                                    if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
704                                                        field_map.insert(field_name.to_string(), None);
705                                                    }
706                                                    field_index += 1;
707                                                }
708                                                "#" | "$" => {
709                                                    // A value corresponding to a hash sign "#" or dollar sign "$" means the field is null or empty.
710                                                    if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
711                                                        field_map.insert(field_name.to_string(), Some("".to_string()));
712                                                    }
713                                                    field_index += 1;
714                                                }
715                                                value if value.starts_with('^') => {
716                                                    let command = value.chars().nth(1).unwrap_or(' ');
717                                                    match command {
718                                                        '0'..='9' => {
719                                                            let count = value[1..].parse().unwrap_or(0);
720                                                            for i in 0..count {
721                                                                if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index + i)) {
722                                                                    field_map.insert(field_name.to_string(), None);
723                                                                }
724                                                            }
725                                                            field_index += count;
726                                                        }
727                                                        'P' | 'T' => {
728                                                            let diff_value = serde_urlencoded::from_str(&value[2..]).unwrap_or_else(|_| value[2..].to_string());
729                                                            if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index))
730                                                                && let Some(prev_value) = field_map.get(field_name).and_then(|v| v.as_ref()) {
731                                                                    let new_value = match command {
732                                                                        'P' => {
733                                                                            // Apply JSON Patch
734                                                                            let patch: serde_json::Value = serde_json::from_str(&diff_value).unwrap_or(serde_json::Value::Null);
735                                                                            let mut prev_json: serde_json::Value = serde_json::from_str(prev_value).unwrap_or(serde_json::Value::Null);
736                                                                            let patch_operations: Vec<json_patch::PatchOperation> = serde_json::from_value(patch).unwrap_or_default();
737                                                                            if let Err(e) = json_patch::patch(&mut prev_json, &patch_operations) {
738                                                                                tracing::warn!("Failed to apply JSON patch: {}", e);
739                                                                            }
740                                                                            prev_json.to_string()
741                                                                        }
742                                                                        'T' => {
743                                                                            // Apply TLCP-diff
744                                                                            //tlcp_diff::apply_diff(prev_value, &diff_value).unwrap_or_else(|_| prev_value.to_string())
745                                                                            unimplemented!("Implement TLCP-diff");
746                                                                        }
747                                                                        _ => unreachable!(),
748                                                                    };
749                                                                    field_map.insert(field_name.to_string(), Some(new_value.to_string()));
750                                                                }
751                                                            field_index += 1;
752                                                        }
753                                                        _ => {
754                                                            let decoded_value = serde_urlencoded::from_str(value).unwrap_or_else(|_| value.to_string());
755                                                            if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
756                                                                field_map.insert(field_name.to_string(), Some(decoded_value));
757                                                            }
758                                                            field_index += 1;
759                                                        }
760                                                    }
761                                                }
762                                                value if value.starts_with('{') => {
763                                                    // in this case it is a json payload that we will let the consumer handle. In this case, it is important
764                                                    // to preserve casing for parsing.
765                                                    let original_json = parse_arguments(submessage).get(3).unwrap_or(&"").split('|').collect::<Vec<&str>>();
766                                                    let mut payload = "";
767                                                    for json in original_json.iter()
768                                                    {
769                                                        if json.is_empty() || *json == "#"
770                                                        {
771                                                            continue;
772                                                        }
773
774                                                        payload = json;
775                                                    }
776
777                                                    if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
778                                                        field_map.insert(field_name.to_string(), Some(payload.to_string()));
779                                                    }
780                                                    field_index += 1;
781                                                }
782                                                _ => {
783                                                    let decoded_value = serde_urlencoded::from_str(value).unwrap_or_else(|_| value.to_string());
784                                                    if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
785                                                        field_map.insert(field_name.to_string(), Some(decoded_value));
786                                                    }
787                                                    field_index += 1;
788                                                }
789                                            }
790                                        }
791
792                                        // Store only item_update's changed fields.
793                                        let changed_fields: HashMap<String, String> = field_map.iter()
794                                            .filter_map(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone())))
795                                            .collect();
796
797                                        //
798                                        // Take the proper item_update from item_updates and update it with changed fields.
799                                        // If the item_update doesn't exist yet, create a new one.
800                                        //
801                                        let current_item_update: ItemUpdate;
802                                        match subscription_item_updates.get_mut(&(subscription_index)) {
803                                            Some(item_updates) => match item_updates.get_mut(&(item_index)) {
804                                                Some(item_update) => {
805                                                    //
806                                                    // Iterate changed_fields and update existing item_update.fields assigning the new values.
807                                                    //
808                                                    for (field_name, new_value) in &changed_fields {
809                                                        if item_update.fields.contains_key(field_name) {
810                                                            item_update.fields.insert((*field_name).clone(), Some(new_value.clone()));
811                                                        }
812                                                    }
813                                                    item_update.changed_fields = changed_fields.clone();
814                                                    item_update.is_snapshot = is_snapshot;
815                                                    current_item_update = item_update.clone();
816                                                },
817                                                None => {
818                                                    // Create a new item_update and add it to item_updates.
819                                                    let item_update = ItemUpdate {
820                                                        item_name: item.cloned(),
821                                                        item_pos: item_index,
822                                                        fields: field_map.clone(),
823                                                        changed_fields: changed_fields.clone(),
824                                                        is_snapshot,
825                                                    };
826                                                    current_item_update = item_update.clone();
827                                                    item_updates.insert(item_index, item_update);
828                                                }
829                                            },
830                                            None => {
831                                                // Create a new item_update and add it to item_updates.
832                                                let item_update = ItemUpdate {
833                                                    item_name: item.cloned(),
834                                                    item_pos: item_index,
835                                                    fields: field_map,
836                                                    changed_fields,
837                                                    is_snapshot,
838                                                };
839                                                current_item_update = item_update.clone();
840                                                let mut item_updates = HashMap::new();
841                                                item_updates.insert(item_index, item_update);
842                                                subscription_item_updates.insert(subscription_index, item_updates);
843                                            }
844                                        };
845
846                                        // Get mutable subscription listeners directly.
847                                        let subscription_listeners = subscription.get_listeners();
848
849                                        // Iterate subscription listeners and call on_item_update for each listener.
850                                        for listener in subscription_listeners {
851                                            listener.on_item_update(&current_item_update);
852                                        }
853                                    }
854                                    //
855                                    // Connection confirmation from server.
856                                    //
857                                    "wsok" => {
858                                        self.make_log( Level::INFO, &format!("Connection confirmed by server: '{}'", clean_text) );
859                                        //
860                                        // Request session creation.
861                                        //
862                                        let ls_adapter_set = match self.connection_details.get_adapter_set() {
863                                            Some(adapter_set) => adapter_set,
864                                            None => {
865                                                return Err(LightstreamerError::invalid_state(
866                                                    "No adapter set found in connection details.",
867                                                ));
868                                            },
869                                        };
870                                        let ls_send_sync = self.connection_options.get_send_sync().to_string();
871                                        let ls_keepalive_millis = configured_keepalive_ms.to_string();
872                                        let ls_inactivity_millis = reverse_heartbeat_ms.to_string();
873                                        let mut params: Vec<(&str, &str)> = vec![
874                                            ("LS_adapter_set", ls_adapter_set),
875                                            ("LS_cid", "mgQkwtwdysogQz2BJ4Ji kOj2Bg"),
876                                            ("LS_send_sync", &ls_send_sync),
877                                        ];
878                                        if configured_keepalive_ms > 0 {
879                                            params.push(("LS_keepalive_millis", &ls_keepalive_millis));
880                                        }
881                                        if reverse_heartbeat_ms > 0 {
882                                            params.push(("LS_inactivity_millis", &ls_inactivity_millis));
883                                        }
884                                        if let Some(user) = &self.connection_details.get_user() {
885                                            params.push(("LS_user", user));
886                                        }
887                                        if let Some(password) = &self.connection_details.get_password() {
888                                            params.push(("LS_password", password));
889                                        }
890                                        params.push(("LS_protocol", Self::TLCP_VERSION));
891                                        let encoded_params = serde_urlencoded::to_string(&params)?;
892                                        write_stream
893                                            .send(Message::Text(format!("create_session\r\n{}\n", encoded_params).into()))
894                                            .await?;
895                                        self.make_log( Level::DEBUG, &format!("Sent create session request: '{}'", encoded_params) );
896                                    },
897                                    unexpected_message => {
898                                        return Err(LightstreamerError::protocol(format!(
899                                            "Unexpected message received from server: '{:?}' (full message: '{}')",
900                                            unexpected_message, clean_text
901                                        )));
902                                    },
903                                }
904                            }
905                        },
906                        Some(Ok(non_text_message)) => {
907                            return Err(LightstreamerError::protocol(format!(
908                                "Unexpected non-text message from server: {:?}",
909                                non_text_message
910                            )));
911                        },
912                        Some(Err(err)) => {
913                            return Err(LightstreamerError::protocol(format!(
914                                "Error reading message from server: {}",
915                                err
916                            )));
917                        },
918                        None => {
919                            self.make_log( Level::DEBUG, "No more messages from server" );
920                            break;
921                        },
922                    }
923                },
924                Some(subscription_request) = self.subscription_receiver.recv() => {
925                    request_id += 1;
926                    // Process subscription requests.
927                    if let Some(subscription) = subscription_request.subscription
928                    {
929                        self.subscriptions.push(subscription);
930
931                        // if we are not connected yet, we will subscribe later
932                        if !is_connected {
933                            continue;
934                        }
935
936                        subscription_id += 1;
937                        // SAFETY: We just pushed a subscription, so last_mut() must return Some.
938                        // Using debug_assert to catch invariant violations during development.
939                        let sub = self.subscriptions.last_mut();
940                        debug_assert!(sub.is_some(), "subscriptions.last_mut() returned None after push()");
941                        if let Some(sub) = sub {
942                            sub.id = subscription_id;
943                            sub.id_sender.try_send(subscription_id)?;
944                        }
945
946                        // SAFETY: We just pushed a subscription, so last() must return Some.
947                        // Extract encoded_params in a separate scope to avoid borrow across await.
948                        let encoded_params = {
949                            let last_sub = self.subscriptions.last();
950                            debug_assert!(last_sub.is_some(), "subscriptions.last() returned None after push()");
951                            let Some(last_sub) = last_sub else {
952                                return Err(LightstreamerError::invalid_state(
953                                    "subscriptions.last() returned None immediately after push()"
954                                ));
955                            };
956                            Self::get_subscription_params(last_sub, request_id)?
957                        };
958
959                        write_stream
960                            .send(Message::Text(format!("control\r\n{}", encoded_params).into()))
961                            .await?;
962
963                        self.make_log( Level::INFO, &format!("Sent subscription request: '{}'", encoded_params) );
964                    }
965                    // Process unsubscription requests.
966                    else if let Some(unsubscription_id) = subscription_request.subscription_id
967                    {
968                        let encoded_params = match Self::get_unsubscription_params(unsubscription_id, request_id)
969                        {
970                            Ok(params) => params,
971                            Err(err) => {
972                                return Err(err);
973                            },
974                        };
975
976                        write_stream
977                            .send(Message::Text(format!("control\r\n{}", encoded_params).into()))
978                            .await?;
979
980                        self.make_log( Level::INFO, &format!("Sent unsubscription request: '{}'", encoded_params) );
981
982                        self.subscriptions.retain(|s| s.id != unsubscription_id);
983
984                        if self.subscriptions.is_empty()
985                        {
986                            self.make_log( Level::INFO, "No more subscriptions, disconnecting" );
987                            shutdown_signal.notify_one();
988                        }
989                    }
990                },
991                _ = shutdown_signal.notified() => {
992                    self.make_log( Level::INFO, "Received shutdown signal" );
993                    break;
994                },
995                //
996                // Liveness watchdog: nothing received within the keepalive
997                // window plus grace — the connection is half-open / dead.
998                //
999                _ = tokio::time::sleep_until(liveness_deadline), if keepalive_timeout_ms > 0 => {
1000                    let idle_ms = keepalive_timeout_ms.saturating_add(Self::KEEPALIVE_GRACE_MS);
1001                    self.make_log(
1002                        Level::ERROR,
1003                        &format!(
1004                            "No message received from server within {} ms (keepalive + grace); connection considered dead",
1005                            idle_ms
1006                        ),
1007                    );
1008                    return Err(LightstreamerError::connection(format!(
1009                        "no message received from server within {} ms (keepalive + grace)",
1010                        idle_ms
1011                    )));
1012                },
1013                //
1014                // Reverse heartbeat: prove client liveness to the server and
1015                // surface write errors promptly on a dead connection.
1016                //
1017                _ = async {
1018                    match heartbeat_timer.as_mut() {
1019                        Some(timer) => { timer.tick().await; },
1020                        None => std::future::pending::<()>().await,
1021                    }
1022                }, if is_connected && session_id.is_some() => {
1023                    // TLCP requires every request to carry at least one
1024                    // parameter — an empty body is rejected with
1025                    // "error,67,Empty request unexpected".
1026                    if let Some(id) = session_id.as_deref() {
1027                        let params = serde_urlencoded::to_string([("LS_session", id)])?;
1028                        write_stream
1029                            .send(Message::Text(format!("heartbeat\r\n{}", params).into()))
1030                            .await?;
1031                        self.make_log(Level::DEBUG, "Sent reverse heartbeat to server");
1032                    }
1033                },
1034            }
1035        }
1036
1037        Ok(())
1038    }
1039
1040    /// Operation method that requests to close the Session opened against the configured Lightstreamer
1041    /// Server (if any).
1042    ///
1043    /// When `disconnect()` is called, the "Stream-Sense" mechanism is stopped.
1044    ///
1045    /// Note that active `Subscription` instances, associated with this `LightstreamerClient` instance,
1046    /// are preserved to be re-subscribed to on future Sessions.
1047    ///
1048    /// Note that the request to disconnect is accomplished by the client in a separate thread; this
1049    /// means that an invocation to `getStatus()` right after `disconnect()` might not reflect the
1050    /// change yet.
1051    ///
1052    /// When the request to disconnect is finally being executed, if the status of the client is
1053    /// "DISCONNECTED", then nothing will be done.
1054    ///
1055    /// See also `connect()`
1056    #[instrument(level = "trace")]
1057    pub async fn disconnect(&mut self) {
1058        self.make_log(Level::INFO, "Disconnecting from Lightstreamer server");
1059
1060        // If auto-reconnect is enabled and we have a connection manager, stop it
1061        if self.auto_reconnect_enabled {
1062            if let Some(ref manager) = self.connection_manager {
1063                manager.shutdown().await;
1064            }
1065            self.connection_manager = None;
1066        }
1067
1068        // Update status to disconnected
1069        self.status = ClientStatus::Disconnected(DisconnectionType::WillRetry);
1070
1071        // Notify listeners about status change
1072        for listener in &self.listeners {
1073            listener.on_status_change(&self.status.to_string());
1074        }
1075    }
1076
1077    /// Static inquiry method that can be used to share cookies between connections to the Server
1078    /// (performed by this library) and connections to other sites that are performed by the application.
1079    /// With this method, cookies received from the Server can be extracted for sending through other
1080    /// connections, according with the URI to be accessed.
1081    ///
1082    /// See `addCookies()` for clarifications on when cookies are directly stored by the library and
1083    /// when not.
1084    ///
1085    /// # Parameters
1086    ///
1087    /// * `uri`: the URI to which the cookies should be sent, or `None`.
1088    ///
1089    /// # Returns
1090    ///
1091    /// A list with the various cookies that can be sent in a HTTP request for the specified URI.
1092    /// If a `None` URI was supplied, all available non-expired cookies will be returned.
1093    pub fn get_cookies(_uri: Option<&str>) -> Cookie<'_> {
1094        // Implementation for get_cookies
1095        unimplemented!()
1096    }
1097
1098    /// Returns a list containing the `ClientListener` instances that were added to this client.
1099    ///
1100    /// # Returns
1101    ///
1102    /// A list containing the listeners that were added to this client.
1103    ///
1104    /// See also `addListener()`
1105    pub fn get_listeners(&self) -> &Vec<Box<dyn ClientListener>> {
1106        &self.listeners
1107    }
1108
1109    /// Inquiry method that gets the current client status and transport (when applicable).
1110    ///
1111    /// # Returns
1112    ///
1113    /// The current client status. It can be one of the following values:
1114    ///
1115    /// - `"CONNECTING"`: the client is waiting for a Server's response in order to establish a connection;
1116    /// - `"CONNECTED:STREAM-SENSING"`: the client has received a preliminary response from the server
1117    ///   and is currently verifying if a streaming connection is possible;
1118    /// - `"CONNECTED:WS-STREAMING"`: a streaming connection over WebSocket is active;
1119    /// - `"CONNECTED:HTTP-STREAMING"`: a streaming connection over HTTP is active;
1120    /// - `"CONNECTED:WS-POLLING"`: a polling connection over WebSocket is in progress;
1121    /// - `"CONNECTED:HTTP-POLLING"`: a polling connection over HTTP is in progress;
1122    /// - `"STALLED"`: the Server has not been sending data on an active streaming connection for
1123    ///   longer than a configured time;
1124    /// - `"DISCONNECTED:WILL-RETRY"`: no connection is currently active but one will be opened
1125    ///   (possibly after a timeout);
1126    /// - `"DISCONNECTED:TRYING-RECOVERY"`: no connection is currently active, but one will be opened
1127    ///   as soon as possible, as an attempt to recover the current session after a connection issue;
1128    /// - `"DISCONNECTED"`: no connection is currently active.
1129    ///
1130    /// See also `ClientListener.onStatusChange()`
1131    pub fn get_status(&self) -> &ClientStatus {
1132        &self.status
1133    }
1134
1135    /// Inquiry method that returns a list containing all the `Subscription` instances that are
1136    /// currently "active" on this `LightstreamerClient`.
1137    ///
1138    /// Internal second-level `Subscription` are not included.
1139    ///
1140    /// # Returns
1141    ///
1142    /// A list, containing all the `Subscription` currently "active" on this `LightstreamerClient`.
1143    /// The list can be empty.
1144    ///
1145    /// See also `subscribe()`
1146    pub fn get_subscriptions(&self) -> &Vec<Subscription> {
1147        &self.subscriptions
1148    }
1149
1150    /// Creates a new instance of `LightstreamerClient`.
1151    ///
1152    /// The constructor initializes the client with the server address and adapter set, if provided.
1153    /// It sets up the connection details and options for the client. If no server address or
1154    /// adapter set is specified, those properties on the client will be `None`. This allows
1155    /// for late configuration of these details before connecting to the Lightstreamer server.
1156    ///
1157    /// # Arguments
1158    /// * `server_address` - An optional reference to a string slice that represents the server
1159    ///   address to connect to. If `None`, the server address must be set later.
1160    /// * `adapter_set` - An optional reference to a string slice that specifies the adapter set name.
1161    ///   If `None`, the adapter set must be set later.
1162    ///
1163    /// # Returns
1164    /// A result containing the new `LightstreamerClient` instance if successful, or an
1165    /// `IllegalStateException` if the initialization fails due to invalid state conditions.
1166    ///
1167    /// # Panics
1168    /// Does not panic under normal circumstances. However, unexpected internal errors during
1169    /// the creation of internal components could cause panics, which should be considered when
1170    /// using this function in production code.
1171    ///
1172    pub fn new(
1173        server_address: Option<&str>,
1174        adapter_set: Option<&str>,
1175        username: Option<&str>,
1176        password: Option<&str>,
1177    ) -> Result<LightstreamerClient, LightstreamerError> {
1178        let connection_details =
1179            ConnectionDetails::new(server_address, adapter_set, username, password)?;
1180        let connection_options = ConnectionOptions::default();
1181        let (subscription_sender, subscription_receiver) = channel(100);
1182
1183        Ok(LightstreamerClient {
1184            server_address: server_address.map(|s| s.to_string()),
1185            adapter_set: adapter_set.map(|s| s.to_string()),
1186            connection_details,
1187            connection_options,
1188            listeners: Vec::new(),
1189            subscriptions: Vec::new(),
1190            status: ClientStatus::Disconnected(DisconnectionType::WillRetry),
1191            logging: LogType::StdLogs,
1192            subscription_sender,
1193            subscription_receiver,
1194            connection_manager: None,
1195            reconnection_config: ReconnectionConfig::default(),
1196            heartbeat_config: HeartbeatConfig::default(),
1197            auto_reconnect_enabled: false,
1198        })
1199    }
1200
1201    /// Removes a listener from the `LightstreamerClient` instance so that it will not receive
1202    /// events anymore.
1203    ///
1204    /// A listener can be removed at any time.
1205    ///
1206    /// # Parameters
1207    ///
1208    /// * `listener`: The listener to be removed.
1209    ///
1210    /// See also `addListener()`
1211    pub fn remove_listener(&mut self, _listener: Box<dyn ClientListener>) {
1212        unimplemented!("Implement mechanism to remove listener from LightstreamerClient");
1213        //self.listeners.remove(&listener);
1214    }
1215
1216    /// Operation method that sends a message to the Server. The message is interpreted and handled
1217    /// by the Metadata Adapter associated to the current Session. This operation supports in-order
1218    /// guaranteed message delivery with automatic batching. In other words, messages are guaranteed
1219    /// to arrive exactly once and respecting the original order, whatever is the underlying transport
1220    /// (HTTP or WebSockets). Furthermore, high frequency messages are automatically batched, if
1221    /// necessary, to reduce network round trips.
1222    ///
1223    /// Upon subsequent calls to the method, the sequential management of the involved messages is
1224    /// guaranteed. The ordering is determined by the order in which the calls to `sendMessage` are
1225    /// issued.
1226    ///
1227    /// If a message, for any reason, doesn't reach the Server (this is possible with the HTTP transport),
1228    /// it will be resent; however, this may cause the subsequent messages to be delayed. For this
1229    /// reason, each message can specify a "delayTimeout", which is the longest time the message,
1230    /// after reaching the Server, can be kept waiting if one of more preceding messages haven't
1231    /// been received yet. If the "delayTimeout" expires, these preceding messages will be discarded;
1232    /// any discarded message will be notified to the listener through `ClientMessageListener.onDiscarded()`.
1233    /// Note that, because of the parallel transport of the messages, if a zero or very low timeout
1234    /// is set for a message and the previous message was sent immediately before, it is possible
1235    /// that the latter gets discarded even if no communication issues occur. The Server may also
1236    /// enforce its own timeout on missing messages, to prevent keeping the subsequent messages for
1237    /// long time.
1238    ///
1239    /// Sequence identifiers can also be associated with the messages. In this case, the sequential
1240    /// management is restricted to all subsets of messages with the same sequence identifier associated.
1241    ///
1242    /// Notifications of the operation outcome can be received by supplying a suitable listener. The
1243    /// supplied listener is guaranteed to be eventually invoked; listeners associated with a sequence
1244    /// are guaranteed to be invoked sequentially.
1245    ///
1246    /// The "UNORDERED_MESSAGES" sequence name has a special meaning. For such a sequence, immediate
1247    /// processing is guaranteed, while strict ordering and even sequentialization of the processing
1248    /// is not enforced. Likewise, strict ordering of the notifications is not enforced. However,
1249    /// messages that, for any reason, should fail to reach the Server whereas subsequent messages
1250    /// had succeeded, might still be discarded after a server-side timeout, in order to ensure that
1251    /// the listener eventually gets a notification.
1252    ///
1253    /// Moreover, if "UNORDERED_MESSAGES" is used and no listener is supplied, a "fire and forget"
1254    /// scenario is assumed. In this case, no checks on missing, duplicated or overtaken messages
1255    /// are performed at all, so as to optimize the processing and allow the highest possible throughput.
1256    ///
1257    /// Since a message is handled by the Metadata Adapter associated to the current connection, a
1258    /// message can be sent only if a connection is currently active. If the special `enqueueWhileDisconnected`
1259    /// flag is specified it is possible to call the method at any time and the client will take
1260    /// care of sending the message as soon as a connection is available, otherwise, if the current
1261    /// status is "DISCONNECTED*", the message will be abandoned and the `ClientMessageListener.onAbort()`
1262    /// event will be fired.
1263    ///
1264    /// Note that, in any case, as soon as the status switches again to "DISCONNECTED*", any message
1265    /// still pending is aborted, including messages that were queued with the `enqueueWhileDisconnected`
1266    /// flag set to `true`.
1267    ///
1268    /// Also note that forwarding of the message to the server is made in a separate thread, hence,
1269    /// if a message is sent while the connection is active, it could be aborted because of a subsequent
1270    /// disconnection. In the same way a message sent while the connection is not active might be
1271    /// sent because of a subsequent connection.
1272    ///
1273    /// # Parameters
1274    ///
1275    /// * `message`: a text message, whose interpretation is entirely demanded to the Metadata Adapter
1276    ///   associated to the current connection.
1277    /// * `sequence`: an alphanumeric identifier, used to identify a subset of messages to be managed
1278    ///   in sequence; underscore characters are also allowed. If the "UNORDERED_MESSAGES" identifier
1279    ///   is supplied, the message will be processed in the special way described above. The parameter
1280    ///   is optional; if set to `None`, "UNORDERED_MESSAGES" is used as the sequence name.
1281    /// * `delay_timeout`: a timeout, expressed in milliseconds. If higher than the Server configured
1282    ///   timeout on missing messages, the latter will be used instead. The parameter is optional; if
1283    ///   a negative value is supplied, the Server configured timeout on missing messages will be applied.
1284    ///   This timeout is ignored for the special "UNORDERED_MESSAGES" sequence, although a server-side
1285    ///   timeout on missing messages still applies.
1286    /// * `listener`: an object suitable for receiving notifications about the processing outcome. The
1287    ///   parameter is optional; if not supplied, no notification will be available.
1288    /// * `enqueue_while_disconnected`: if this flag is set to `true`, and the client is in a disconnected
1289    ///   status when the provided message is handled, then the message is not aborted right away but
1290    ///   is queued waiting for a new session. Note that the message can still be aborted later when
1291    ///   a new session is established.
1292    pub fn send_message(
1293        &mut self,
1294        message: &str,
1295        sequence: Option<&str>,
1296        _delay_timeout: Option<u64>,
1297        listener: Option<Box<dyn ClientMessageListener>>,
1298        enqueue_while_disconnected: bool,
1299    ) {
1300        let _sequence = sequence.unwrap_or("UNORDERED_MESSAGES");
1301
1302        // Handle the message based on the current connection status
1303        match &self.status {
1304            ClientStatus::Connected(_connection_type) => {
1305                // Send the message to the server in a separate thread
1306                // ...
1307            }
1308            ClientStatus::Disconnected(_disconnection_type) => {
1309                if enqueue_while_disconnected {
1310                    // Enqueue the message to be sent when a connection is available
1311                    // ...
1312                } else {
1313                    // Abort the message and notify the listener
1314                    if let Some(listener) = listener {
1315                        listener.on_abort(message, false);
1316                    }
1317                }
1318            }
1319            _ => {
1320                // Enqueue the message to be sent when a connection is available
1321                // ...
1322            }
1323        }
1324        unimplemented!("Complete mechanism to send message to LightstreamerClient.");
1325    }
1326
1327    /// Static method that permits to configure the logging system used by the library. The logging
1328    /// system must respect the `LoggerProvider` interface. A custom class can be used to wrap any
1329    /// third-party logging system.
1330    ///
1331    /// If no logging system is specified, all the generated log is discarded.
1332    ///
1333    /// The following categories are available to be consumed:
1334    ///
1335    /// - `lightstreamer.stream`: logs socket activity on Lightstreamer Server connections; at INFO
1336    ///   level, socket operations are logged; at DEBUG level, read/write data exchange is logged.
1337    /// - `lightstreamer.protocol`: logs requests to Lightstreamer Server and Server answers; at INFO
1338    ///   level, requests are logged; at DEBUG level, request details and events from the Server are logged.
1339    /// - `lightstreamer.session`: logs Server Session lifecycle events; at INFO level, lifecycle events
1340    ///   are logged; at DEBUG level, lifecycle event details are logged.
1341    /// - `lightstreamer.subscriptions`: logs subscription requests received by the clients and the related
1342    ///   updates; at WARN level, alert events from the Server are logged; at INFO level, subscriptions
1343    ///   and unsubscriptions are logged; at DEBUG level, requests batching and update details are logged.
1344    /// - `lightstreamer.actions`: logs settings / API calls.
1345    ///
1346    /// # Parameters
1347    ///
1348    /// * `provider`: A `LoggerProvider` instance that will be used to generate log messages by the
1349    ///   library classes.
1350    pub fn set_logger_provider() {
1351        unimplemented!("Implement mechanism to set logger provider for LightstreamerClient.");
1352    }
1353    /*
1354    pub fn set_logger_provider(provider: LoggerProvider) {
1355        // Implementation for set_logger_provider
1356    }
1357    */
1358
1359    /// Provides a mean to control the way TLS certificates are evaluated, with the possibility to
1360    /// accept untrusted ones.
1361    ///
1362    /// May be called only once before creating any `LightstreamerClient` instance.
1363    ///
1364    /// # Parameters
1365    ///
1366    /// * `factory`: an instance of `ssl.SSLContext`
1367    ///
1368    /// # Raises
1369    ///
1370    /// * `IllegalArgumentException`: if the factory is `None`
1371    /// * `IllegalStateException`: if a factory is already installed
1372    pub fn set_trust_manager_factory() {
1373        unimplemented!("Implement mechanism to set trust manager factory for LightstreamerClient.");
1374    }
1375    /*
1376    pub fn set_trust_manager_factory(factory: Option<SslContext>) -> Result<(), IllegalArgumentException> {
1377        if factory.is_none() {
1378            return Err(IllegalArgumentException::new(
1379                "Factory cannot be None",
1380            ));
1381        }
1382
1383        // Implementation for set_trust_manager_factory
1384        Ok(())
1385    }
1386    */
1387
1388    /// Adds a subscription to the `LightstreamerClient` instance.
1389    ///
1390    /// Active subscriptions are subscribed to through the server as soon as possible (i.e. as soon
1391    /// as there is a session available). Active `Subscription` are automatically persisted across different
1392    /// sessions as long as a related unsubscribe call is not issued.
1393    ///
1394    /// Subscriptions can be given to the `LightstreamerClient` at any time. Once done the `Subscription`
1395    /// immediately enters the "active" state.
1396    ///
1397    /// Once "active", a `Subscription` instance cannot be provided again to a `LightstreamerClient`
1398    /// unless it is first removed from the "active" state through a call to `unsubscribe()`.
1399    ///
1400    /// Also note that forwarding of the subscription to the server is made in a separate thread.
1401    ///
1402    /// A successful subscription to the server will be notified through a `SubscriptionListener.onSubscription()`
1403    /// event.
1404    ///
1405    /// # Parameters
1406    ///
1407    /// * `subscription_sender`: A `Sender` object that sends a `SubscriptionRequest` to the `LightstreamerClient`
1408    /// * `subscription`: A `Subscription` object, carrying all the information needed to process real-time
1409    ///   values.
1410    ///
1411    /// See also `unsubscribe()`
1412    ///
1413    /// # Errors
1414    ///
1415    /// Returns an error if the subscription channel is closed.
1416    pub async fn subscribe(
1417        subscription_sender: Sender<SubscriptionRequest>,
1418        subscription: Subscription,
1419    ) -> Result<(), LightstreamerError> {
1420        subscription_sender
1421            .send(SubscriptionRequest {
1422                subscription: Some(subscription),
1423                subscription_id: None,
1424            })
1425            .await
1426            .map_err(|e| LightstreamerError::channel(format!("Failed to send subscription: {}", e)))
1427    }
1428
1429    /// If you want to be able to unsubscribe from a subscription, you need to keep track of the id
1430    /// of the subscription. This blocking method allows you to wait for the id of the subscription
1431    /// to be returned.
1432    ///
1433    /// # Parameters
1434    ///
1435    /// * `subscription_sender`: A `Sender` object that sends a `SubscriptionRequest` to the `LightstreamerClient`
1436    /// * `subscription`: A `Subscription` object, carrying all the information needed to process real-time
1437    ///
1438    pub async fn subscribe_get_id(
1439        subscription_sender: Sender<SubscriptionRequest>,
1440        mut subscription: Subscription,
1441    ) -> Result<usize, LightstreamerError> {
1442        // Extract the id_receiver before sending the subscription
1443        let mut id_receiver = subscription.id_receiver;
1444
1445        // Create a new channel for the subscription we're about to send
1446        let (_new_sender, new_receiver) = channel(1);
1447        subscription.id_receiver = new_receiver;
1448
1449        // Send the subscription
1450        LightstreamerClient::subscribe(subscription_sender, subscription).await?;
1451
1452        // Wait for the ID to be updated through the channel
1453        match id_receiver.recv().await {
1454            Some(id) => Ok(id),
1455            None => Err(LightstreamerError::invalid_state(
1456                "Failed to get subscription id",
1457            )),
1458        }
1459    }
1460
1461    /// Operation method that removes a `Subscription` that is currently in the "active" state.
1462    ///
1463    /// By bringing back a `Subscription` to the "inactive" state, the unsubscription from all its
1464    /// items is requested to Lightstreamer Server.
1465    ///
1466    /// Subscription can be unsubscribed from at any time. Once done the `Subscription` immediately
1467    /// exits the "active" state.
1468    ///
1469    /// Note that forwarding of the unsubscription to the server is made in a separate thread.
1470    ///
1471    /// The unsubscription will be notified through a `SubscriptionListener.onUnsubscription()` event.
1472    ///
1473    /// # Parameters
1474    ///
1475    /// * `subscription_sender`: A `Sender` object that sends a `SubscriptionRequest` to the `LightstreamerClient`
1476    /// * `subscription_id`: The id of the subscription to be unsubscribed from.
1477    ///   instance.
1478    ///
1479    /// # Errors
1480    ///
1481    /// Returns an error if the subscription channel is closed.
1482    pub async fn unsubscribe(
1483        subscription_sender: Sender<SubscriptionRequest>,
1484        subscription_id: usize,
1485    ) -> Result<(), LightstreamerError> {
1486        subscription_sender
1487            .send(SubscriptionRequest {
1488                subscription: None,
1489                subscription_id: Some(subscription_id),
1490            })
1491            .await
1492            .map_err(|e| {
1493                LightstreamerError::channel(format!("Failed to send unsubscription: {}", e))
1494            })
1495    }
1496
1497    /// Method setting enum for the logging of this instance.
1498    ///
1499    /// Default logging type is StdLogs, corresponding to `stdout`
1500    ///
1501    /// `LightstreamerClient` has methods for logging that are compatible with the `Tracing` crate.
1502    /// Enabling logging for the `Tracing` crate requires implementation of a tracing subscriber
1503    /// and its configuration and formatting.
1504    ///
1505    /// # Parameters
1506    ///
1507    /// * `logging`: An enum declaring the logging type of this `LightstreamerClient` instance.
1508    pub fn set_logging_type(&mut self, logging: LogType) {
1509        self.logging = logging;
1510    }
1511
1512    /// Method for logging messages
1513    ///
1514    /// Match case wraps log types. `loglevel` param ignored in StdLogs case, all output to stdout.
1515    ///
1516    /// # Parameters
1517    ///
1518    /// * `loglevel` Enum determining use of stdout or Tracing subscriber.
1519    pub fn make_log(&mut self, loglevel: Level, log: &str) {
1520        match self.logging {
1521            LogType::StdLogs => {
1522                debug!("{}", log);
1523            }
1524            LogType::TracingLogs => match loglevel {
1525                Level::INFO => {
1526                    info!(log);
1527                }
1528                Level::WARN => {
1529                    warn!(log);
1530                }
1531                Level::ERROR => {
1532                    error!(log);
1533                }
1534                Level::TRACE => {
1535                    trace!(log);
1536                }
1537                Level::DEBUG => {
1538                    debug!(log);
1539                }
1540            },
1541        }
1542    }
1543
1544    /// Enables automatic reconnection with default configuration.
1545    pub fn enable_auto_reconnect(&mut self) {
1546        self.auto_reconnect_enabled = true;
1547        // ConnectionManager will be created during connect() to avoid circular dependency
1548    }
1549
1550    /// Enables automatic reconnection with the specified configuration.
1551    ///
1552    /// # Parameters
1553    ///
1554    /// * `reconnection_config`: Configuration for reconnection behavior
1555    /// * `heartbeat_config`: Configuration for heartbeat monitoring
1556    pub fn enable_auto_reconnect_with_config(
1557        &mut self,
1558        reconnection_config: ReconnectionConfig,
1559        heartbeat_config: HeartbeatConfig,
1560    ) -> Result<(), LightstreamerError> {
1561        self.auto_reconnect_enabled = true;
1562        self.reconnection_config = reconnection_config;
1563        self.heartbeat_config = heartbeat_config;
1564        self.auto_reconnect_enabled = true;
1565        // ConnectionManager will be created during connect() to avoid circular dependency
1566        Ok(())
1567    }
1568
1569    /// Disables automatic reconnection.
1570    pub async fn disable_auto_reconnect(&mut self) {
1571        self.auto_reconnect_enabled = false;
1572        if let Some(manager) = &self.connection_manager {
1573            manager.shutdown().await;
1574        }
1575        self.connection_manager = None;
1576    }
1577
1578    /// Returns whether automatic reconnection is currently enabled.
1579    pub fn is_auto_reconnect_enabled(&self) -> bool {
1580        self.auto_reconnect_enabled
1581    }
1582
1583    /// Gets the current reconnection configuration.
1584    pub fn get_reconnection_config(&self) -> &ReconnectionConfig {
1585        &self.reconnection_config
1586    }
1587
1588    /// Gets the current heartbeat configuration.
1589    pub fn get_heartbeat_config(&self) -> &HeartbeatConfig {
1590        &self.heartbeat_config
1591    }
1592
1593    /// Updates the reconnection configuration.
1594    pub fn set_reconnection_config(&mut self, config: ReconnectionConfig) {
1595        self.reconnection_config = config;
1596        // Note: ConnectionManager doesn't support runtime config updates
1597        // The configuration is set during ConnectionManager creation
1598    }
1599
1600    /// Updates the heartbeat configuration.
1601    pub fn set_heartbeat_config(&mut self, config: HeartbeatConfig) {
1602        self.heartbeat_config = config;
1603        // Note: ConnectionManager doesn't support runtime config updates
1604        // The configuration is set during ConnectionManager creation
1605    }
1606
1607    /// Gets the current connection state from the connection manager.
1608    pub async fn get_connection_state(&self) -> ConnectionState {
1609        if let Some(manager) = &self.connection_manager {
1610            manager.get_connection_state().await
1611        } else {
1612            ConnectionState::Disconnected
1613        }
1614    }
1615
1616    /// Gets connection metrics from the connection manager.
1617    pub async fn get_connection_metrics(&self) -> crate::connection::management::ConnectionMetrics {
1618        if let Some(manager) = &self.connection_manager {
1619            manager.get_metrics().await
1620        } else {
1621            crate::connection::management::ConnectionMetrics::default()
1622        }
1623    }
1624
1625    /// Forces an immediate reconnection attempt if auto-reconnect is enabled.
1626    pub async fn force_reconnect(&mut self) -> Result<(), LightstreamerError> {
1627        if !self.auto_reconnect_enabled {
1628            return Err(LightstreamerError::invalid_state(
1629                "Auto-reconnect is not enabled",
1630            ));
1631        }
1632
1633        if let Some(manager) = &mut self.connection_manager {
1634            manager.force_reconnect().await?;
1635        }
1636
1637        Ok(())
1638    }
1639}
1640
1641/// Computes the effective liveness window after the server declares its
1642/// keepalive interval in the `conok` message.
1643///
1644/// Returns the server-declared value when no window was configured
1645/// (`current_ms == 0`), otherwise the larger of the two — the window may only
1646/// widen, never shrink, so a server declaring a longer keepalive cannot cause
1647/// false-positive disconnects.
1648#[must_use]
1649#[inline]
1650fn widen_keepalive_timeout(current_ms: u64, server_declared_ms: u64) -> u64 {
1651    if current_ms == 0 {
1652        server_declared_ms
1653    } else {
1654        current_ms.max(server_declared_ms)
1655    }
1656}
1657
1658#[cfg(test)]
1659mod tests {
1660    use super::*;
1661    use crate::subscription::{Subscription, SubscriptionListener, SubscriptionMode};
1662
1663    use std::fmt::Debug;
1664    use std::sync::{Arc, Mutex};
1665    use tokio::sync::Notify;
1666
1667    #[derive(Debug)]
1668    struct MockClientListener {
1669        property_changes: Arc<Mutex<Vec<String>>>,
1670        status_changes: Arc<Mutex<Vec<String>>>,
1671        server_errors: Arc<Mutex<Vec<(i32, String)>>>,
1672    }
1673
1674    impl MockClientListener {
1675        fn new() -> Self {
1676            MockClientListener {
1677                property_changes: Arc::new(Mutex::new(Vec::new())),
1678                status_changes: Arc::new(Mutex::new(Vec::new())),
1679                server_errors: Arc::new(Mutex::new(Vec::new())),
1680            }
1681        }
1682
1683        #[allow(dead_code)]
1684        fn with_shared_data(
1685            property_changes: Arc<Mutex<Vec<String>>>,
1686            status_changes: Arc<Mutex<Vec<String>>>,
1687            server_errors: Arc<Mutex<Vec<(i32, String)>>>,
1688        ) -> Self {
1689            MockClientListener {
1690                property_changes,
1691                status_changes,
1692                server_errors,
1693            }
1694        }
1695    }
1696
1697    impl ClientListener for MockClientListener {
1698        fn on_property_change(&self, property: &str) {
1699            if let Ok(mut guard) = self.property_changes.lock() {
1700                guard.push(property.to_string());
1701            }
1702        }
1703
1704        fn on_status_change(&self, status: &str) {
1705            if let Ok(mut guard) = self.status_changes.lock() {
1706                guard.push(status.to_string());
1707            }
1708        }
1709
1710        fn on_server_error(&self, code: i32, message: &str) {
1711            if let Ok(mut guard) = self.server_errors.lock() {
1712                guard.push((code, message.to_string()));
1713            }
1714        }
1715    }
1716
1717    #[allow(dead_code)]
1718    struct MockSubscriptionListener;
1719
1720    impl SubscriptionListener for MockSubscriptionListener {
1721        fn on_subscription(&mut self) {}
1722        fn on_unsubscription(&mut self) {}
1723        fn on_item_update(&self, _update: &ItemUpdate) {}
1724    }
1725
1726    impl Debug for MockSubscriptionListener {
1727        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1728            write!(f, "MockSubscriptionListener")
1729        }
1730    }
1731
1732    #[allow(dead_code)]
1733    struct LightstreamerClientTestContext {
1734        client: LightstreamerClient,
1735        property_changes: Arc<Mutex<Vec<String>>>,
1736        status_changes: Arc<Mutex<Vec<String>>>,
1737        server_errors: Arc<Mutex<Vec<(i32, String)>>>,
1738    }
1739
1740    impl LightstreamerClientTestContext {
1741        #[allow(dead_code)]
1742        fn new() -> Result<Self, LightstreamerError> {
1743            let property_changes = Arc::new(Mutex::new(Vec::new()));
1744            let status_changes = Arc::new(Mutex::new(Vec::new()));
1745            let server_errors = Arc::new(Mutex::new(Vec::new()));
1746            let listener = MockClientListener::with_shared_data(
1747                Arc::clone(&property_changes),
1748                Arc::clone(&status_changes),
1749                Arc::clone(&server_errors),
1750            );
1751
1752            let mut client = LightstreamerClient::new(
1753                Some("http://test.lightstreamer.com"),
1754                Some("DEMO"),
1755                None,
1756                None,
1757            )?;
1758            client.add_listener(Box::new(listener));
1759
1760            Ok(LightstreamerClientTestContext {
1761                client,
1762                property_changes,
1763                status_changes,
1764                server_errors,
1765            })
1766        }
1767    }
1768
1769    #[test]
1770    fn test_new_lightstreamer_client() -> Result<(), LightstreamerError> {
1771        let client = LightstreamerClient::new(
1772            Some("http://test.lightstreamer.com"),
1773            Some("DEMO"),
1774            None,
1775            None,
1776        )?;
1777        assert_eq!(
1778            client.server_address,
1779            Some("http://test.lightstreamer.com".to_string())
1780        );
1781        assert_eq!(client.adapter_set, Some("DEMO".to_string()));
1782        let client = LightstreamerClient::new(
1783            Some("http://test.lightstreamer.com"),
1784            Some("DEMO"),
1785            Some("user1"),
1786            Some("pass1"),
1787        )?;
1788        assert_eq!(
1789            client.connection_details.get_user(),
1790            Some(&"user1".to_string())
1791        );
1792        assert_eq!(
1793            client.connection_details.get_password(),
1794            Some(&"pass1".to_string())
1795        );
1796        let result = LightstreamerClient::new(Some("invalid-url"), Some("DEMO"), None, None);
1797        assert!(result.is_err());
1798        let client = LightstreamerClient::new(None, Some("DEMO"), None, None)?;
1799        assert_eq!(client.server_address, None);
1800        Ok(())
1801    }
1802
1803    #[test]
1804    fn test_add_listener() -> Result<(), LightstreamerError> {
1805        let mut client = LightstreamerClient::new(
1806            Some("http://test.lightstreamer.com"),
1807            Some("DEMO"),
1808            None,
1809            None,
1810        )?;
1811        assert_eq!(client.listeners.len(), 0);
1812        let listener = Box::new(MockClientListener::new());
1813        client.add_listener(listener);
1814        assert_eq!(client.listeners.len(), 1);
1815        let listener2 = Box::new(MockClientListener::new());
1816        client.add_listener(listener2);
1817        assert_eq!(client.listeners.len(), 2);
1818        Ok(())
1819    }
1820
1821    #[test]
1822    fn test_get_listeners() -> Result<(), LightstreamerError> {
1823        let mut client = LightstreamerClient::new(
1824            Some("http://test.lightstreamer.com"),
1825            Some("DEMO"),
1826            None,
1827            None,
1828        )?;
1829        assert_eq!(client.get_listeners().len(), 0);
1830
1831        let listener = Box::new(MockClientListener::new());
1832        client.add_listener(listener);
1833        assert_eq!(client.get_listeners().len(), 1);
1834        Ok(())
1835    }
1836
1837    #[test]
1838    fn test_get_status() -> Result<(), LightstreamerError> {
1839        let client = LightstreamerClient::new(
1840            Some("http://test.lightstreamer.com"),
1841            Some("DEMO"),
1842            None,
1843            None,
1844        )?;
1845        match client.get_status() {
1846            ClientStatus::Disconnected(DisconnectionType::WillRetry) => {}
1847            _ => panic!("Expected initial status to be DISCONNECTED:WILL-RETRY"),
1848        }
1849        Ok(())
1850    }
1851
1852    #[test]
1853    fn test_get_subscriptions() -> Result<(), LightstreamerError> {
1854        let client = LightstreamerClient::new(
1855            Some("http://test.lightstreamer.com"),
1856            Some("DEMO"),
1857            None,
1858            None,
1859        )?;
1860        assert_eq!(client.get_subscriptions().len(), 0);
1861        Ok(())
1862    }
1863
1864    #[tokio::test]
1865    async fn test_connect_with_no_server_address() -> Result<(), LightstreamerError> {
1866        let client = LightstreamerClient::new(None, Some("DEMO"), None, None)?;
1867        let client_arc = Arc::new(tokio::sync::Mutex::new(client));
1868        let shutdown_signal = Arc::new(Notify::new());
1869        let result = LightstreamerClient::connect(client_arc, shutdown_signal).await;
1870        assert!(result.is_err());
1871        Ok(())
1872    }
1873
1874    #[tokio::test]
1875    async fn test_forced_transport_validation() -> Result<(), LightstreamerError> {
1876        let mut client = LightstreamerClient::new(
1877            Some("http://test.lightstreamer.com"),
1878            Some("DEMO"),
1879            None,
1880            None,
1881        )?;
1882
1883        client.connection_options.set_forced_transport(None);
1884        let client_arc = Arc::new(tokio::sync::Mutex::new(client));
1885        let shutdown_signal = Arc::new(Notify::new());
1886        let result = LightstreamerClient::connect(client_arc.clone(), shutdown_signal).await;
1887        assert!(result.is_err());
1888        client_arc
1889            .lock()
1890            .await
1891            .connection_options
1892            .set_forced_transport(Some(Transport::WsStreaming));
1893        Ok(())
1894    }
1895
1896    #[test]
1897    fn test_subscription_params_generation() -> Result<(), LightstreamerError> {
1898        let subscription = Subscription::new(
1899            SubscriptionMode::Merge,
1900            Some(vec!["item1".to_string(), "item2".to_string()]),
1901            Some(vec!["field1".to_string(), "field2".to_string()]),
1902        )?;
1903
1904        let params_str = LightstreamerClient::get_subscription_params(&subscription, 1)?;
1905
1906        assert!(params_str.contains("LS_reqId=1"));
1907        assert!(params_str.contains("LS_op=add"));
1908        assert!(params_str.contains("LS_subId="));
1909        assert!(params_str.contains("LS_mode=MERGE"));
1910        assert!(params_str.contains("LS_group="));
1911        assert!(params_str.contains("LS_schema="));
1912        Ok(())
1913    }
1914
1915    #[test]
1916    fn test_unsubscription_params_generation() -> Result<(), LightstreamerError> {
1917        let params_str = LightstreamerClient::get_unsubscription_params(42, 123)?;
1918
1919        assert!(params_str.contains("LS_reqId=123"));
1920        assert!(params_str.contains("LS_op=delete"));
1921        assert!(params_str.contains("LS_subId=42"));
1922        Ok(())
1923    }
1924
1925    #[test]
1926    fn test_logging_functions() -> Result<(), LightstreamerError> {
1927        let mut client = LightstreamerClient::new(
1928            Some("http://test.lightstreamer.com"),
1929            Some("DEMO"),
1930            None,
1931            None,
1932        )?;
1933
1934        client.set_logging_type(LogType::StdLogs);
1935
1936        client.make_log(Level::INFO, "Test log message");
1937        client.make_log(Level::DEBUG, "Test debug message");
1938        client.set_logging_type(LogType::TracingLogs);
1939        client.make_log(Level::INFO, "Test tracing log message");
1940        client.make_log(Level::DEBUG, "Test tracing debug message");
1941        Ok(())
1942    }
1943
1944    #[test]
1945    fn test_debug_implementation() -> Result<(), LightstreamerError> {
1946        let client = LightstreamerClient::new(
1947            Some("http://test.lightstreamer.com"),
1948            Some("DEMO"),
1949            None,
1950            None,
1951        )?;
1952
1953        // Test that Debug implementation works without panicking
1954        let debug_string = format!("{:?}", client);
1955
1956        // Verify it contains expected fields
1957        assert!(debug_string.contains("server_address"));
1958        assert!(debug_string.contains("adapter_set"));
1959        assert!(debug_string.contains("connection_details"));
1960        assert!(debug_string.contains("connection_options"));
1961        assert!(debug_string.contains("listeners"));
1962        assert!(debug_string.contains("subscriptions"));
1963
1964        // Verify the values are included
1965        assert!(debug_string.contains("http://test.lightstreamer.com"));
1966        assert!(debug_string.contains("DEMO"));
1967        Ok(())
1968    }
1969
1970    #[test]
1971    #[should_panic(expected = "Implement mechanism to add cookies to LightstreamerClient")]
1972    fn test_add_cookies() {
1973        // Test the static method add_cookies
1974        let cookie = Cookie::new("test_cookie", "test_value");
1975        LightstreamerClient::add_cookies("http://test.lightstreamer.com", &cookie);
1976    }
1977
1978    #[test]
1979    #[should_panic(expected = "not implemented")]
1980    fn test_get_cookies() {
1981        // Test the static method get_cookies
1982        LightstreamerClient::get_cookies(Some("http://test.lightstreamer.com"));
1983    }
1984}
1985
1986#[cfg(test)]
1987mod liveness_tests {
1988    use super::widen_keepalive_timeout;
1989
1990    #[test]
1991    fn test_widen_keepalive_timeout_unconfigured_adopts_server_value() {
1992        assert_eq!(widen_keepalive_timeout(0, 5_000), 5_000);
1993    }
1994
1995    #[test]
1996    fn test_widen_keepalive_timeout_server_longer_widens() {
1997        assert_eq!(widen_keepalive_timeout(15_000, 30_000), 30_000);
1998    }
1999
2000    #[test]
2001    fn test_widen_keepalive_timeout_server_shorter_never_shrinks() {
2002        assert_eq!(widen_keepalive_timeout(15_000, 5_000), 15_000);
2003    }
2004
2005    #[test]
2006    fn test_widen_keepalive_timeout_equal_values_unchanged() {
2007        assert_eq!(widen_keepalive_timeout(15_000, 15_000), 15_000);
2008    }
2009}