Skip to main content

lightstreamer_rs/client/
implementation.rs

1use crate::subscription::{ItemUpdate, Snapshot, Subscription, SubscriptionMode};
2
3use crate::client::Transport;
4pub(crate) use crate::client::listener::ClientListener;
5use crate::client::message_listener::ClientMessageListener;
6use crate::client::model::{ClientStatus, DisconnectionType, LogType};
7use crate::client::request::SubscriptionRequest;
8use crate::client::utils::get_subscription_by_id;
9use crate::connection::{ConnectionDetails, ConnectionOptions};
10use crate::connection::{ConnectionManager, ConnectionState, HeartbeatConfig, ReconnectionConfig};
11use crate::utils::{LightstreamerError, clean_message, parse_arguments};
12use cookie::Cookie;
13use futures_util::{SinkExt, StreamExt};
14use std::collections::HashMap;
15use std::fmt::{self, Debug, Formatter};
16use std::sync::Arc;
17use std::time::Duration;
18use tokio::sync::mpsc::channel;
19use tokio::sync::{
20    Mutex, Notify,
21    mpsc::{Receiver, Sender},
22};
23use tokio::time::Instant as TokioInstant;
24use tokio_tungstenite::{
25    connect_async,
26    tungstenite::{
27        Message,
28        http::{HeaderName, HeaderValue, Request},
29    },
30};
31use tracing::{Level, debug, error, info, instrument, trace, warn};
32use url::Url;
33
34/// Facade class for the management of the communication to Lightstreamer Server. Used to provide
35/// configuration settings, event handlers, operations for the control of the connection lifecycle,
36/// Subscription handling and to send messages.
37///
38/// An instance of `LightstreamerClient` handles the communication with Lightstreamer Server on a
39/// specified endpoint. Hence, it hosts one "Session"; or, more precisely, a sequence of Sessions,
40/// since any Session may fail and be recovered, or it can be interrupted on purpose. So, normally,
41/// a single instance of `LightstreamerClient` is needed.
42///
43/// However, multiple instances of `LightstreamerClient` can be used, toward the same or multiple
44/// endpoints.
45///
46/// You can listen to the events generated by a session by registering an event listener, such as
47/// `ClientListener` or `SubscriptionListener`. These listeners allow you to handle various events,
48/// such as session creation, connection status, subscription updates, and server messages. However,
49/// you should be aware that the event notifications are dispatched by a single thread, the so-called
50/// event thread. This means that if the operations of a listener are slow or blocking, they will
51/// delay the processing of the other listeners and affect the performance of your application.
52/// Therefore, you should delegate any slow or blocking operations to a dedicated thread, and keep
53/// the listener methods as fast and simple as possible. Note that even if you create multiple
54/// instances of `LightstreamerClient`, they will all use a single event thread, that is shared
55/// among them.
56///
57/// # Parameters
58///
59/// * `server_address`: the address of the Lightstreamer Server to which this `LightstreamerClient`
60///   will connect to. It is possible to specify it later by using `None` here. See
61///   `ConnectionDetails.setServerAddress()` for details.
62/// * `adapter_set`: the name of the Adapter Set mounted on Lightstreamer Server to be used to handle
63///   all requests in the Session associated with this `LightstreamerClient`. It is possible not to
64///   specify it at all or to specify it later by using `None` here. See `ConnectionDetails.setAdapterSet()`
65///   for details.
66///
67/// # Raises
68///
69/// * `IllegalArgumentException`: if a not valid address is passed. See `ConnectionDetails.setServerAddress()`
70///   for details.
71pub struct LightstreamerClient {
72    /// The address of the Lightstreamer Server to which this `LightstreamerClient` will connect.
73    server_address: Option<String>,
74    /// The name of the Adapter Set mounted on Lightstreamer Server to be used to handle all
75    /// requests in the Session associated with this `LightstreamerClient`.
76    adapter_set: Option<String>,
77    /// Data object that contains the details needed to open a connection to a Lightstreamer Server.
78    /// This instance is set up by the `LightstreamerClient` object at its own creation. Properties
79    /// of this object can be overwritten by values received from a Lightstreamer Server.
80    pub connection_details: ConnectionDetails,
81    /// Data object that contains options and policies for the connection to the server. This instance
82    /// is set up by the `LightstreamerClient` object at its own creation. Properties of this object
83    /// can be overwritten by values received from a Lightstreamer Server.
84    pub connection_options: ConnectionOptions,
85    /// A list of listeners that will receive events from the `LightstreamerClient` instance.
86    listeners: Vec<Box<dyn ClientListener>>,
87    /// A list containing all the `Subscription` instances that are currently "active" on this
88    /// `LightstreamerClient`.
89    subscriptions: Vec<Subscription>,
90    /// The current status of the client.
91    status: ClientStatus,
92    /// Logging Type to be used
93    logging: LogType,
94    /// The sender that can be used to subscribe/unsubscribe
95    pub subscription_sender: Sender<SubscriptionRequest>,
96    /// The receiver used for subscribe/unsubsribe
97    subscription_receiver: Receiver<SubscriptionRequest>,
98    /// Connection manager for automatic reconnection
99    connection_manager: Option<Arc<ConnectionManager>>,
100    /// Configuration for reconnection behavior
101    reconnection_config: ReconnectionConfig,
102    /// Configuration for heartbeat monitoring
103    heartbeat_config: HeartbeatConfig,
104    /// Whether automatic reconnection is enabled
105    auto_reconnect_enabled: bool,
106}
107
108impl Debug for LightstreamerClient {
109    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
110        f.debug_struct("LightstreamerClient")
111            .field("server_address", &self.server_address)
112            .field("adapter_set", &self.adapter_set)
113            .field("connection_details", &self.connection_details)
114            .field("connection_options", &self.connection_options)
115            .field("listeners", &self.listeners)
116            .field("subscriptions", &self.subscriptions)
117            .finish()
118    }
119}
120
121impl LightstreamerClient {
122    /// A constant string representing the name of the library.
123    pub const LIB_NAME: &'static str = "rust_client";
124
125    /// A constant string representing the version of the library.
126    pub const LIB_VERSION: &'static str = "0.1.0";
127
128    //
129    // Constants for WebSocket connection.
130    //
131    /// WebSocket key used in the WebSocket handshake process.
132    /// This is a base64-encoded value that is used to establish the WebSocket connection.
133    pub const SEC_WEBSOCKET_KEY: &'static str = "PNDUibe9ex7PnsrLbt0N4w==";
134
135    /// WebSocket protocol identifier for the TLCP protocol used by Lightstreamer.
136    /// This identifies the specific subprotocol used over the WebSocket connection.
137    pub const SEC_WEBSOCKET_PROTOCOL: &'static str = "TLCP-2.4.0.lightstreamer.com";
138
139    /// WebSocket version used for the connection.
140    /// Version 13 is the standard version used in modern WebSocket implementations.
141    pub const SEC_WEBSOCKET_VERSION: &'static str = "13";
142
143    /// WebSocket upgrade header value.
144    /// Used to indicate that the client wishes to upgrade from HTTP to WebSocket protocol.
145    pub const SEC_WEBSOCKET_UPGRADE: &'static str = "websocket";
146
147    /// A constant string representing the version of the TLCP protocol used by the library.
148    pub const TLCP_VERSION: &'static str = "TLCP-2.4.0";
149
150    /// Grace period (milliseconds) added on top of the keepalive interval before
151    /// the connection is considered dead by the liveness watchdog.
152    ///
153    /// The server guarantees a message (data or `probe`) at least every keepalive
154    /// interval; the grace absorbs network jitter and scheduling delays.
155    pub const KEEPALIVE_GRACE_MS: u64 = 5_000;
156
157    /// Static method that can be used to share cookies between connections to the Server (performed by
158    /// this library) and connections to other sites that are performed by the application. With this
159    /// method, cookies received by the application can be added (or replaced if already present) to
160    /// the cookie set used by the library to access the Server. Obviously, only cookies whose domain
161    /// is compatible with the Server domain will be used internally.
162    ///
163    /// This method should be invoked before calling the `LightstreamerClient.connect()` method.
164    /// However it can be invoked at any time; it will affect the internal cookie set immediately
165    /// and the sending of cookies on the next HTTP request or WebSocket establishment.
166    ///
167    /// # Parameters
168    ///
169    /// * `uri`: the URI from which the supplied cookies were received. It cannot be `None`.
170    /// * `cookies`: an instance of `http.cookies.SimpleCookie`.
171    ///
172    /// See also `getCookies()`
173    pub fn add_cookies(_uri: &str, _cookies: &Cookie) {
174        // Implementation for add_cookies
175        unimplemented!("Implement mechanism to add cookies to LightstreamerClient");
176    }
177
178    /// Adds a listener that will receive events from the `LightstreamerClient` instance.
179    ///
180    /// The same listener can be added to several different `LightstreamerClient` instances.
181    ///
182    /// A listener can be added at any time. A call to add a listener already present will be ignored.
183    ///
184    /// # Parameters
185    ///
186    /// * `listener`: An object that will receive the events as documented in the `ClientListener`
187    ///   interface.
188    ///
189    /// See also `removeListener()`
190    pub fn add_listener(&mut self, listener: Box<dyn ClientListener>) {
191        self.listeners.push(listener);
192    }
193
194    /// Packs s string with the necessary parameters for a subscription request.
195    ///
196    /// # Parameters
197    ///
198    /// * `subscription`: The subscription for which to get the parameters.
199    /// * `request_id`: The request ID to use in the parameters.
200    ///
201    fn get_subscription_params(
202        subscription: &Subscription,
203        request_id: usize,
204    ) -> Result<String, LightstreamerError> {
205        let ls_req_id = request_id.to_string();
206        let ls_sub_id = subscription.id.to_string();
207        let ls_mode = subscription.get_mode().to_string();
208        let ls_group = match subscription.get_item_group() {
209            Some(item_group) => item_group.to_string(),
210            None => match subscription.get_items() {
211                Some(items) => items.join(" "),
212                None => {
213                    return Err(LightstreamerError::invalid_argument(
214                        "No item group or items found in subscription.",
215                    ));
216                }
217            },
218        };
219        let ls_schema = match subscription.get_field_schema() {
220            Some(field_schema) => field_schema.to_string(),
221            None => match subscription.get_fields() {
222                Some(fields) => fields.join(" "),
223                None => {
224                    return Err(LightstreamerError::invalid_argument(
225                        "No field schema or fields found in subscription.",
226                    ));
227                }
228            },
229        };
230        let ls_data_adapter = match subscription.get_data_adapter() {
231            Some(data_adapter) => data_adapter.to_string(),
232            None => "".to_string(),
233        };
234        let ls_snapshot = subscription
235            .get_requested_snapshot()
236            .unwrap_or_default()
237            .to_string();
238        //
239        // Prepare the subscription request.
240        //
241        let mut params: Vec<(&str, &str)> = vec![
242            ("LS_data_adapter", &ls_data_adapter),
243            ("LS_reqId", &ls_req_id),
244            ("LS_op", "add"),
245            ("LS_subId", &ls_sub_id),
246            ("LS_mode", &ls_mode),
247            ("LS_group", &ls_group),
248            ("LS_schema", &ls_schema),
249            ("LS_ack", "false"),
250        ];
251        // Remove the data adapter parameter if not specified.
252        if ls_data_adapter.is_empty() {
253            params.remove(0);
254        }
255        if !ls_snapshot.is_empty() {
256            params.push(("LS_snapshot", &ls_snapshot));
257        }
258
259        Ok(serde_urlencoded::to_string(&params)?)
260    }
261
262    fn get_unsubscription_params(
263        subscription_id: usize,
264        request_id: usize,
265    ) -> Result<String, LightstreamerError> {
266        let ls_req_id = request_id.to_string();
267        let ls_sub_id = subscription_id.to_string();
268        //
269        // Prepare the unsubscription request.
270        //
271        let params: Vec<(&str, &str)> = vec![
272            ("LS_reqId", &ls_req_id),
273            ("LS_op", "delete"),
274            ("LS_subId", &ls_sub_id),
275        ];
276
277        Ok(serde_urlencoded::to_string(&params)?)
278    }
279
280    /// Operation method that requests to open a Session against the configured Lightstreamer Server.
281    ///
282    /// When `connect()` is called, unless a single transport was forced through `ConnectionOptions.setForcedTransport()`,
283    /// the so called "Stream-Sense" mechanism is started: if the client does not receive any answer
284    /// for some seconds from the streaming connection, then it will automatically open a polling
285    /// connection.
286    ///
287    /// A polling connection may also be opened if the environment is not suitable for a streaming
288    /// connection.
289    ///
290    /// Note that as "polling connection" we mean a loop of polling requests, each of which requires
291    /// opening a synchronous (i.e. not streaming) connection to Lightstreamer Server.
292    ///
293    /// Note that the request to connect is accomplished by the client in a separate thread; this
294    /// means that an invocation to `getStatus()` right after `connect()` might not reflect the change
295    /// yet.
296    ///
297    /// When the request to connect is finally being executed, if the current status of the client
298    /// is not `DISCONNECTED`, then nothing will be done.
299    ///
300    /// # Raises
301    ///
302    /// * `IllegalStateException`: if no server address was configured.
303    ///
304    /// See also `getStatus()`
305    ///
306    /// See also `disconnect()`
307    ///
308    /// See also `ClientListener.onStatusChange()`
309    ///
310    /// See also `ConnectionDetails.setServerAddress()`
311    #[instrument(level = "trace")]
312    pub async fn connect(
313        client: Arc<Mutex<Self>>,
314        shutdown_signal: Arc<Notify>,
315    ) -> Result<(), LightstreamerError> {
316        // If auto-reconnect is enabled, use ConnectionManager
317        let auto_reconnect_enabled = {
318            let client_guard = client.lock().await;
319            client_guard.auto_reconnect_enabled
320        };
321
322        if auto_reconnect_enabled {
323            let (reconnection_config, heartbeat_config) = {
324                let client_guard = client.lock().await;
325                (
326                    client_guard.reconnection_config.clone(),
327                    client_guard.heartbeat_config.clone(),
328                )
329            };
330
331            let weak_client = Arc::downgrade(&client);
332            let connection_manager: Arc<ConnectionManager> =
333                ConnectionManager::new(weak_client, reconnection_config, heartbeat_config);
334
335            {
336                let mut client_guard = client.lock().await;
337                client_guard.connection_manager = Some(connection_manager.clone());
338            }
339
340            // Use direct connection method
341            return client.lock().await.connect_direct(shutdown_signal).await;
342        }
343
344        // Fallback to direct connection without auto-reconnect
345        let mut client_guard = client.lock().await;
346        client_guard.connect_direct(shutdown_signal).await
347    }
348
349    /// Direct connection method without automatic reconnection
350    #[instrument(level = "trace")]
351    pub async fn connect_direct(
352        &mut self,
353        shutdown_signal: Arc<Notify>,
354    ) -> Result<(), LightstreamerError> {
355        // Check if the server address is configured.
356        if self.server_address.is_none() {
357            return Err(LightstreamerError::invalid_state(
358                "No server address was configured.",
359            ));
360        }
361        //
362        // Only WebSocket streaming transport is currently supported.
363        //
364        let forced_transport = self.connection_options.get_forced_transport();
365        if forced_transport.is_none_or(|t| *t != Transport::WsStreaming) {
366            return Err(LightstreamerError::invalid_state(
367                "Only WebSocket streaming transport is currently supported.",
368            ));
369        }
370        //
371        // Convert the HTTP URL to a WebSocket URL.
372        //
373        let http_url = self
374            .connection_details
375            .get_server_address()
376            .ok_or_else(|| LightstreamerError::invalid_state("Server address is not set."))?;
377        let mut url = Url::parse(http_url).map_err(|e| {
378            LightstreamerError::invalid_state(format!("Failed to parse server address URL: {}", e))
379        })?;
380        match url.scheme() {
381            "http" => url.set_scheme("ws").map_err(|()| {
382                LightstreamerError::invalid_state("Failed to set scheme to ws for WebSocket URL.")
383            })?,
384            "https" => url.set_scheme("wss").map_err(|()| {
385                LightstreamerError::invalid_state("Failed to set scheme to wss for WebSocket URL.")
386            })?,
387            invalid_scheme => {
388                return Err(LightstreamerError::invalid_state(format!(
389                    "Unsupported scheme '{}' found when converting HTTP URL to WebSocket URL.",
390                    invalid_scheme
391                )));
392            }
393        }
394        let ws_url = url.as_str();
395
396        // Build the WebSocket request with the necessary headers.
397        let request = Request::builder()
398            .uri(ws_url)
399            .header(
400                HeaderName::from_static("connection"),
401                HeaderValue::from_static("Upgrade"),
402            )
403            .header(
404                HeaderName::from_static("host"),
405                HeaderValue::from_str(url.host_str().unwrap_or("localhost")).map_err(|err| {
406                    LightstreamerError::invalid_state(format!(
407                        "Invalid header value for header with name 'host': {}",
408                        err
409                    ))
410                })?,
411            )
412            .header(
413                HeaderName::from_static("sec-websocket-key"),
414                HeaderValue::from_static(Self::SEC_WEBSOCKET_KEY),
415            )
416            .header(
417                HeaderName::from_static("sec-websocket-protocol"),
418                HeaderValue::from_static(Self::SEC_WEBSOCKET_PROTOCOL),
419            )
420            .header(
421                HeaderName::from_static("sec-websocket-version"),
422                HeaderValue::from_static(Self::SEC_WEBSOCKET_VERSION),
423            )
424            .header(
425                HeaderName::from_static("upgrade"),
426                HeaderValue::from_static(Self::SEC_WEBSOCKET_UPGRADE),
427            )
428            .body(())?;
429
430        // Connect to the Lightstreamer server using WebSocket.
431        let ws_stream = match connect_async(request).await {
432            Ok((ws_stream, response)) => {
433                if let Some(server_header) = response.headers().get("server") {
434                    self.make_log(
435                        Level::INFO,
436                        &format!(
437                            "Connected to Lightstreamer server: {}",
438                            server_header.to_str().unwrap_or("")
439                        ),
440                    );
441                } else {
442                    self.make_log(Level::INFO, "Connected to Lightstreamer server");
443                }
444                ws_stream
445            }
446            Err(err) => {
447                return Err(LightstreamerError::connection(format!(
448                    "Failed to connect to Lightstreamer server with WebSocket: {}",
449                    err
450                )));
451            }
452        };
453
454        // Split the WebSocket stream into a write and a read stream.
455        let (mut write_stream, mut read_stream) = ws_stream.split();
456
457        //
458        // Initiate communication with the server by sending a 'wsok' message.
459        //
460        write_stream.send(Message::Text("wsok".into())).await?;
461
462        //
463        // Start reading and processing messages from the server.
464        //
465        let mut is_connected = false;
466        let mut request_id: usize = 0;
467        let mut _session_id: Option<String> = None;
468        let mut subscription_id: usize = 0;
469        let mut subscription_item_updates: HashMap<usize, HashMap<usize, ItemUpdate>> =
470            HashMap::new();
471
472        //
473        // Liveness enforcement and reverse heartbeat.
474        //
475        // The server guarantees that some message (data or `probe`) is sent at
476        // least every keepalive interval. If nothing arrives within the
477        // effective keepalive window plus a grace period, the connection is
478        // considered dead (half-open TCP) and an error is returned so the
479        // caller can reconnect. The window starts from the configured
480        // keepalive interval (if any) and is widened to the server-declared
481        // keepalive from the `conok` message once known.
482        //
483        let configured_keepalive_ms = self.connection_options.get_keepalive_interval();
484        let reverse_heartbeat_ms = self.connection_options.get_reverse_heartbeat_interval();
485        let mut keepalive_timeout_ms: u64 = configured_keepalive_ms;
486        let mut last_message_at = TokioInstant::now();
487
488        let mut heartbeat_timer = if reverse_heartbeat_ms > 0 {
489            let mut timer = tokio::time::interval(Duration::from_millis(reverse_heartbeat_ms));
490            timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
491            Some(timer)
492        } else {
493            None
494        };
495
496        loop {
497            let liveness_deadline = last_message_at
498                + Duration::from_millis(
499                    keepalive_timeout_ms.saturating_add(Self::KEEPALIVE_GRACE_MS),
500                );
501            tokio::select! {
502                message = read_stream.next() => {
503                    last_message_at = TokioInstant::now();
504                    match message {
505                        Some(Ok(Message::Text(text))) => {
506                            // Messages could include multiple submessages separated by /r/n.
507                            // Split the message into submessages and process each one separately.
508                            let submessages: Vec<&str> = text.split("\r\n")
509                                .filter(|&line| !line.trim().is_empty()) // Filter out empty lines.
510                                .collect();
511                            for submessage in submessages {
512                                let clean_text = clean_message(submessage);
513                                let submessage_fields: Vec<&str> = clean_text.split(",").collect();
514                                match *submessage_fields.first().unwrap_or(&"") {
515                                    //
516                                    // Errors from server.
517                                    //
518                                    //
519                                    // Session-level error: the session is not usable — fail the
520                                    // connection so the caller can reconnect. (The previous
521                                    // `break` only exited the submessage loop, leaving a dead
522                                    // session looping forever.)
523                                    //
524                                    "conerr" => {
525                                        self.make_log( Level::ERROR, &format!("Received connection error from Lightstreamer server: {}", clean_text) );
526                                        return Err(LightstreamerError::connection(format!(
527                                            "connection error from server: {}",
528                                            clean_text
529                                        )));
530                                    },
531                                    //
532                                    // Request-level error: the session survives — log and continue.
533                                    //
534                                    "reqerr" => {
535                                        self.make_log( Level::ERROR, &format!("Received request error from Lightstreamer server: {}", clean_text) );
536                                    },
537                                    //
538                                    // Session created successfully.
539                                    //
540                                    "conok" => {
541                                        is_connected = true;
542                                        // CONOK,<session-ID>,<request-limit>,<keep-alive>,<control-link>
543                                        // Widen the liveness window to the server-declared keepalive
544                                        // so enforcement works even when no keepalive was configured.
545                                        if let Some(server_keepalive_ms) = submessage_fields
546                                            .get(3)
547                                            .and_then(|v| v.trim().parse::<u64>().ok())
548                                            .filter(|v| *v > 0)
549                                        {
550                                            keepalive_timeout_ms = widen_keepalive_timeout(
551                                                keepalive_timeout_ms,
552                                                server_keepalive_ms,
553                                            );
554                                            self.make_log( Level::DEBUG, &format!("Server-declared keepalive interval: {} ms", server_keepalive_ms) );
555                                        }
556                                        if let Some(session_id) = submessage_fields.get(1) {
557                                            self.make_log( Level::DEBUG, &format!("Session creation confirmed by server: {}", clean_text) );
558                                            self.make_log( Level::DEBUG, &format!("Session created with ID: {:?}", session_id) );
559                                            //
560                                            // Subscribe to the desired items.
561                                            //
562                                            while let Some(subscription) = self.subscriptions.get_mut(subscription_id) {
563                                                //
564                                                // Gather all the necessary subscription parameters.
565                                                //
566                                                subscription_id += 1;
567                                                request_id += 1;
568                                                subscription.id = subscription_id;
569                                                subscription.id_sender.try_send(subscription_id)?;
570
571                                                let encoded_params = match Self::get_subscription_params(subscription, request_id)
572                                                {
573                                                    Ok(params) => params,
574                                                    Err(err) => {
575                                                        return Err(err);
576                                                    },
577                                                };
578
579                                                write_stream
580                                                    .send(Message::Text(format!("control\r\n{}", encoded_params).into()))
581                                                    .await?;
582                                                debug!("Sent subscription request: '{}'", encoded_params);
583                                            }
584                                        } else {
585                                            return Err(LightstreamerError::protocol(
586                                                "Session ID not found in 'conok' message from server",
587                                            ));
588                                        }
589                                    },
590                                    //
591                                    // Notifications from server.
592                                    //
593                                    "conf" | "cons" | "clientip" | "servname" | "prog" | "sync" | "eos" => {
594                                        self.make_log( Level::INFO, &format!("Received notification from server: {}", clean_text) );
595                                        // Don't do anything with these notifications for now.
596                                    },
597                                    "probe" => {
598                                        self.make_log( Level::DEBUG, &format!("Received probe message from server: {}", clean_text ) );
599                                    },
600                                    "reqok" => {
601                                        self.make_log( Level::DEBUG, &format!("Received reqok message from server: '{}'", clean_text ) );
602                                    },
603                                    //
604                                    // Subscription confirmation from server.
605                                    //
606                                    "subok" => {
607                                        self.make_log( Level::INFO, &format!("Subscription confirmed by server: '{}'", clean_text) );
608                                    },
609                                    //
610                                    // Usubscription confirmation from server.
611                                    //
612                                    "unsub" => {
613                                        self.make_log( Level::INFO, &format!("Unsubscription confirmed by server: '{}'", clean_text) );
614                                    },
615                                    //
616                                    // Data updates from server.
617                                    //
618                                    "u" => {
619                                        // Parse arguments from the received message.
620                                        let arguments = parse_arguments(&clean_text);
621                                        //
622                                        // Extract the subscription from the first argument.
623                                        //
624                                        let subscription_index = arguments.get(1).unwrap_or(&"").parse::<usize>().unwrap_or(0);
625                                        let subscription = match get_subscription_by_id(self.get_subscriptions(), subscription_index) {
626                                            Some(subscription) => subscription,
627                                            None => {
628                                                self.make_log( Level::WARN, &format!("Subscription not found for index: {}", subscription_index) );
629                                                continue;
630
631                                            }
632                                        };
633                                        //
634                                        // Extract the item from the second argument.
635                                        //
636                                        let item_index = arguments.get(2).unwrap_or(&"").parse::<usize>().unwrap_or(0);
637                                        let item = match subscription.get_items() {
638                                            Some(items) => items.get(item_index-1),
639                                            None => {
640                                                self.make_log( Level::WARN, &format!("No items found in subscription: {:?}", subscription) );
641                                                continue;
642                                            }
643                                        };
644                                        //
645                                        // Determine if the update is a snapshot or real-time update based on the subscription parameters.
646                                        //
647                                        let is_snapshot = match subscription.get_requested_snapshot() {
648                                            Some(ls_snapshot) => {
649                                                match ls_snapshot {
650                                                    Snapshot::No => false,
651                                                    Snapshot::Yes => {
652                                                        match subscription.get_mode() {
653                                                            SubscriptionMode::Merge => {
654                                                                if arguments.len() == 4 && arguments[3] == "$" {
655                                                                    // EOS notification received
656                                                                    true
657                                                                } else {
658                                                                    // If item doesn't exist in item_updates yet, the first update
659                                                                    // is always a snapshot.
660                                                                    if let Some(item_updates) = subscription_item_updates.get(&(subscription_index)) {
661                                                                        if item_updates.get(&(item_index)).is_some() {
662                                                                            // Item update already exists in item_updates, so it's not a snapshot.
663                                                                            false
664                                                                        } else {
665                                                                            // Item update doesn't exist in item_updates, so the first update is always a snapshot.
666                                                                            true
667                                                                        }
668                                                                    } else {
669                                                                        // Item updates not found for subscription, so the first update is always a snapshot.
670                                                                        true
671                                                                    }
672                                                                }
673                                                            },
674                                                            SubscriptionMode::Distinct | SubscriptionMode::Command => {
675                                                                !subscription.is_subscribed()
676                                                            },
677                                                            _ => false,
678                                                        }
679                                                    },
680                                                    _ => false,
681                                                }
682                                            },
683                                            None => false,
684                                        };
685
686                                        // Extract the field values from the third argument.
687                                        let field_values: Vec<&str> = arguments.get(3).unwrap_or(&"").split('|').collect();
688
689                                        //
690                                        // Get fields from subscription and create a HashMap of field names and values.
691                                        //
692                                        let subscription_fields = subscription.get_fields();
693                                        let mut field_map: HashMap<String, Option<String>> = subscription_fields
694                                            .map(|fields| fields.iter().map(|field_name| (field_name.to_string(), None)).collect())
695                                            .unwrap_or_default();
696
697                                        let mut field_index = 0;
698                                        for value in field_values {
699                                            match value {
700                                                "" => {
701                                                    // An empty value means the field is unchanged compared to the previous update of the same field.
702                                                    if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
703                                                        field_map.insert(field_name.to_string(), None);
704                                                    }
705                                                    field_index += 1;
706                                                }
707                                                "#" | "$" => {
708                                                    // A value corresponding to a hash sign "#" or dollar sign "$" means the field is null or empty.
709                                                    if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
710                                                        field_map.insert(field_name.to_string(), Some("".to_string()));
711                                                    }
712                                                    field_index += 1;
713                                                }
714                                                value if value.starts_with('^') => {
715                                                    let command = value.chars().nth(1).unwrap_or(' ');
716                                                    match command {
717                                                        '0'..='9' => {
718                                                            let count = value[1..].parse().unwrap_or(0);
719                                                            for i in 0..count {
720                                                                if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index + i)) {
721                                                                    field_map.insert(field_name.to_string(), None);
722                                                                }
723                                                            }
724                                                            field_index += count;
725                                                        }
726                                                        'P' | 'T' => {
727                                                            let diff_value = serde_urlencoded::from_str(&value[2..]).unwrap_or_else(|_| value[2..].to_string());
728                                                            if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index))
729                                                                && let Some(prev_value) = field_map.get(field_name).and_then(|v| v.as_ref()) {
730                                                                    let new_value = match command {
731                                                                        'P' => {
732                                                                            // Apply JSON Patch
733                                                                            let patch: serde_json::Value = serde_json::from_str(&diff_value).unwrap_or(serde_json::Value::Null);
734                                                                            let mut prev_json: serde_json::Value = serde_json::from_str(prev_value).unwrap_or(serde_json::Value::Null);
735                                                                            let patch_operations: Vec<json_patch::PatchOperation> = serde_json::from_value(patch).unwrap_or_default();
736                                                                            if let Err(e) = json_patch::patch(&mut prev_json, &patch_operations) {
737                                                                                tracing::warn!("Failed to apply JSON patch: {}", e);
738                                                                            }
739                                                                            prev_json.to_string()
740                                                                        }
741                                                                        'T' => {
742                                                                            // Apply TLCP-diff
743                                                                            //tlcp_diff::apply_diff(prev_value, &diff_value).unwrap_or_else(|_| prev_value.to_string())
744                                                                            unimplemented!("Implement TLCP-diff");
745                                                                        }
746                                                                        _ => unreachable!(),
747                                                                    };
748                                                                    field_map.insert(field_name.to_string(), Some(new_value.to_string()));
749                                                                }
750                                                            field_index += 1;
751                                                        }
752                                                        _ => {
753                                                            let decoded_value = serde_urlencoded::from_str(value).unwrap_or_else(|_| value.to_string());
754                                                            if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
755                                                                field_map.insert(field_name.to_string(), Some(decoded_value));
756                                                            }
757                                                            field_index += 1;
758                                                        }
759                                                    }
760                                                }
761                                                value if value.starts_with('{') => {
762                                                    // in this case it is a json payload that we will let the consumer handle. In this case, it is important
763                                                    // to preserve casing for parsing.
764                                                    let original_json = parse_arguments(submessage).get(3).unwrap_or(&"").split('|').collect::<Vec<&str>>();
765                                                    let mut payload = "";
766                                                    for json in original_json.iter()
767                                                    {
768                                                        if json.is_empty() || *json == "#"
769                                                        {
770                                                            continue;
771                                                        }
772
773                                                        payload = json;
774                                                    }
775
776                                                    if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
777                                                        field_map.insert(field_name.to_string(), Some(payload.to_string()));
778                                                    }
779                                                    field_index += 1;
780                                                }
781                                                _ => {
782                                                    let decoded_value = serde_urlencoded::from_str(value).unwrap_or_else(|_| value.to_string());
783                                                    if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
784                                                        field_map.insert(field_name.to_string(), Some(decoded_value));
785                                                    }
786                                                    field_index += 1;
787                                                }
788                                            }
789                                        }
790
791                                        // Store only item_update's changed fields.
792                                        let changed_fields: HashMap<String, String> = field_map.iter()
793                                            .filter_map(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone())))
794                                            .collect();
795
796                                        //
797                                        // Take the proper item_update from item_updates and update it with changed fields.
798                                        // If the item_update doesn't exist yet, create a new one.
799                                        //
800                                        let current_item_update: ItemUpdate;
801                                        match subscription_item_updates.get_mut(&(subscription_index)) {
802                                            Some(item_updates) => match item_updates.get_mut(&(item_index)) {
803                                                Some(item_update) => {
804                                                    //
805                                                    // Iterate changed_fields and update existing item_update.fields assigning the new values.
806                                                    //
807                                                    for (field_name, new_value) in &changed_fields {
808                                                        if item_update.fields.contains_key(field_name) {
809                                                            item_update.fields.insert((*field_name).clone(), Some(new_value.clone()));
810                                                        }
811                                                    }
812                                                    item_update.changed_fields = changed_fields.clone();
813                                                    item_update.is_snapshot = is_snapshot;
814                                                    current_item_update = item_update.clone();
815                                                },
816                                                None => {
817                                                    // Create a new item_update and add it to item_updates.
818                                                    let item_update = ItemUpdate {
819                                                        item_name: item.cloned(),
820                                                        item_pos: item_index,
821                                                        fields: field_map.clone(),
822                                                        changed_fields: changed_fields.clone(),
823                                                        is_snapshot,
824                                                    };
825                                                    current_item_update = item_update.clone();
826                                                    item_updates.insert(item_index, item_update);
827                                                }
828                                            },
829                                            None => {
830                                                // Create a new item_update and add it to item_updates.
831                                                let item_update = ItemUpdate {
832                                                    item_name: item.cloned(),
833                                                    item_pos: item_index,
834                                                    fields: field_map,
835                                                    changed_fields,
836                                                    is_snapshot,
837                                                };
838                                                current_item_update = item_update.clone();
839                                                let mut item_updates = HashMap::new();
840                                                item_updates.insert(item_index, item_update);
841                                                subscription_item_updates.insert(subscription_index, item_updates);
842                                            }
843                                        };
844
845                                        // Get mutable subscription listeners directly.
846                                        let subscription_listeners = subscription.get_listeners();
847
848                                        // Iterate subscription listeners and call on_item_update for each listener.
849                                        for listener in subscription_listeners {
850                                            listener.on_item_update(&current_item_update);
851                                        }
852                                    }
853                                    //
854                                    // Connection confirmation from server.
855                                    //
856                                    "wsok" => {
857                                        self.make_log( Level::INFO, &format!("Connection confirmed by server: '{}'", clean_text) );
858                                        //
859                                        // Request session creation.
860                                        //
861                                        let ls_adapter_set = match self.connection_details.get_adapter_set() {
862                                            Some(adapter_set) => adapter_set,
863                                            None => {
864                                                return Err(LightstreamerError::invalid_state(
865                                                    "No adapter set found in connection details.",
866                                                ));
867                                            },
868                                        };
869                                        let ls_send_sync = self.connection_options.get_send_sync().to_string();
870                                        let ls_keepalive_millis = configured_keepalive_ms.to_string();
871                                        let ls_inactivity_millis = reverse_heartbeat_ms.to_string();
872                                        let mut params: Vec<(&str, &str)> = vec![
873                                            ("LS_adapter_set", ls_adapter_set),
874                                            ("LS_cid", "mgQkwtwdysogQz2BJ4Ji kOj2Bg"),
875                                            ("LS_send_sync", &ls_send_sync),
876                                        ];
877                                        if configured_keepalive_ms > 0 {
878                                            params.push(("LS_keepalive_millis", &ls_keepalive_millis));
879                                        }
880                                        if reverse_heartbeat_ms > 0 {
881                                            params.push(("LS_inactivity_millis", &ls_inactivity_millis));
882                                        }
883                                        if let Some(user) = &self.connection_details.get_user() {
884                                            params.push(("LS_user", user));
885                                        }
886                                        if let Some(password) = &self.connection_details.get_password() {
887                                            params.push(("LS_password", password));
888                                        }
889                                        params.push(("LS_protocol", Self::TLCP_VERSION));
890                                        let encoded_params = serde_urlencoded::to_string(&params)?;
891                                        write_stream
892                                            .send(Message::Text(format!("create_session\r\n{}\n", encoded_params).into()))
893                                            .await?;
894                                        self.make_log( Level::DEBUG, &format!("Sent create session request: '{}'", encoded_params) );
895                                    },
896                                    unexpected_message => {
897                                        return Err(LightstreamerError::protocol(format!(
898                                            "Unexpected message received from server: '{:?}'",
899                                            unexpected_message
900                                        )));
901                                    },
902                                }
903                            }
904                        },
905                        Some(Ok(non_text_message)) => {
906                            return Err(LightstreamerError::protocol(format!(
907                                "Unexpected non-text message from server: {:?}",
908                                non_text_message
909                            )));
910                        },
911                        Some(Err(err)) => {
912                            return Err(LightstreamerError::protocol(format!(
913                                "Error reading message from server: {}",
914                                err
915                            )));
916                        },
917                        None => {
918                            self.make_log( Level::DEBUG, "No more messages from server" );
919                            break;
920                        },
921                    }
922                },
923                Some(subscription_request) = self.subscription_receiver.recv() => {
924                    request_id += 1;
925                    // Process subscription requests.
926                    if let Some(subscription) = subscription_request.subscription
927                    {
928                        self.subscriptions.push(subscription);
929
930                        // if we are not connected yet, we will subscribe later
931                        if !is_connected {
932                            continue;
933                        }
934
935                        subscription_id += 1;
936                        // SAFETY: We just pushed a subscription, so last_mut() must return Some.
937                        // Using debug_assert to catch invariant violations during development.
938                        let sub = self.subscriptions.last_mut();
939                        debug_assert!(sub.is_some(), "subscriptions.last_mut() returned None after push()");
940                        if let Some(sub) = sub {
941                            sub.id = subscription_id;
942                            sub.id_sender.try_send(subscription_id)?;
943                        }
944
945                        // SAFETY: We just pushed a subscription, so last() must return Some.
946                        // Extract encoded_params in a separate scope to avoid borrow across await.
947                        let encoded_params = {
948                            let last_sub = self.subscriptions.last();
949                            debug_assert!(last_sub.is_some(), "subscriptions.last() returned None after push()");
950                            let Some(last_sub) = last_sub else {
951                                return Err(LightstreamerError::invalid_state(
952                                    "subscriptions.last() returned None immediately after push()"
953                                ));
954                            };
955                            Self::get_subscription_params(last_sub, request_id)?
956                        };
957
958                        write_stream
959                            .send(Message::Text(format!("control\r\n{}", encoded_params).into()))
960                            .await?;
961
962                        self.make_log( Level::INFO, &format!("Sent subscription request: '{}'", encoded_params) );
963                    }
964                    // Process unsubscription requests.
965                    else if let Some(unsubscription_id) = subscription_request.subscription_id
966                    {
967                        let encoded_params = match Self::get_unsubscription_params(unsubscription_id, request_id)
968                        {
969                            Ok(params) => params,
970                            Err(err) => {
971                                return Err(err);
972                            },
973                        };
974
975                        write_stream
976                            .send(Message::Text(format!("control\r\n{}", encoded_params).into()))
977                            .await?;
978
979                        self.make_log( Level::INFO, &format!("Sent unsubscription request: '{}'", encoded_params) );
980
981                        self.subscriptions.retain(|s| s.id != unsubscription_id);
982
983                        if self.subscriptions.is_empty()
984                        {
985                            self.make_log( Level::INFO, "No more subscriptions, disconnecting" );
986                            shutdown_signal.notify_one();
987                        }
988                    }
989                },
990                _ = shutdown_signal.notified() => {
991                    self.make_log( Level::INFO, "Received shutdown signal" );
992                    break;
993                },
994                //
995                // Liveness watchdog: nothing received within the keepalive
996                // window plus grace — the connection is half-open / dead.
997                //
998                _ = tokio::time::sleep_until(liveness_deadline), if keepalive_timeout_ms > 0 => {
999                    let idle_ms = keepalive_timeout_ms.saturating_add(Self::KEEPALIVE_GRACE_MS);
1000                    self.make_log(
1001                        Level::ERROR,
1002                        &format!(
1003                            "No message received from server within {} ms (keepalive + grace); connection considered dead",
1004                            idle_ms
1005                        ),
1006                    );
1007                    return Err(LightstreamerError::connection(format!(
1008                        "no message received from server within {} ms (keepalive + grace)",
1009                        idle_ms
1010                    )));
1011                },
1012                //
1013                // Reverse heartbeat: prove client liveness to the server and
1014                // surface write errors promptly on a dead connection.
1015                //
1016                _ = async {
1017                    match heartbeat_timer.as_mut() {
1018                        Some(timer) => { timer.tick().await; },
1019                        None => std::future::pending::<()>().await,
1020                    }
1021                }, if is_connected => {
1022                    write_stream.send(Message::Text("heartbeat\r\n".into())).await?;
1023                    self.make_log(Level::DEBUG, "Sent reverse heartbeat to server");
1024                },
1025            }
1026        }
1027
1028        Ok(())
1029    }
1030
1031    /// Operation method that requests to close the Session opened against the configured Lightstreamer
1032    /// Server (if any).
1033    ///
1034    /// When `disconnect()` is called, the "Stream-Sense" mechanism is stopped.
1035    ///
1036    /// Note that active `Subscription` instances, associated with this `LightstreamerClient` instance,
1037    /// are preserved to be re-subscribed to on future Sessions.
1038    ///
1039    /// Note that the request to disconnect is accomplished by the client in a separate thread; this
1040    /// means that an invocation to `getStatus()` right after `disconnect()` might not reflect the
1041    /// change yet.
1042    ///
1043    /// When the request to disconnect is finally being executed, if the status of the client is
1044    /// "DISCONNECTED", then nothing will be done.
1045    ///
1046    /// See also `connect()`
1047    #[instrument(level = "trace")]
1048    pub async fn disconnect(&mut self) {
1049        self.make_log(Level::INFO, "Disconnecting from Lightstreamer server");
1050
1051        // If auto-reconnect is enabled and we have a connection manager, stop it
1052        if self.auto_reconnect_enabled {
1053            if let Some(ref manager) = self.connection_manager {
1054                manager.shutdown().await;
1055            }
1056            self.connection_manager = None;
1057        }
1058
1059        // Update status to disconnected
1060        self.status = ClientStatus::Disconnected(DisconnectionType::WillRetry);
1061
1062        // Notify listeners about status change
1063        for listener in &self.listeners {
1064            listener.on_status_change(&self.status.to_string());
1065        }
1066    }
1067
1068    /// Static inquiry method that can be used to share cookies between connections to the Server
1069    /// (performed by this library) and connections to other sites that are performed by the application.
1070    /// With this method, cookies received from the Server can be extracted for sending through other
1071    /// connections, according with the URI to be accessed.
1072    ///
1073    /// See `addCookies()` for clarifications on when cookies are directly stored by the library and
1074    /// when not.
1075    ///
1076    /// # Parameters
1077    ///
1078    /// * `uri`: the URI to which the cookies should be sent, or `None`.
1079    ///
1080    /// # Returns
1081    ///
1082    /// A list with the various cookies that can be sent in a HTTP request for the specified URI.
1083    /// If a `None` URI was supplied, all available non-expired cookies will be returned.
1084    pub fn get_cookies(_uri: Option<&str>) -> Cookie<'_> {
1085        // Implementation for get_cookies
1086        unimplemented!()
1087    }
1088
1089    /// Returns a list containing the `ClientListener` instances that were added to this client.
1090    ///
1091    /// # Returns
1092    ///
1093    /// A list containing the listeners that were added to this client.
1094    ///
1095    /// See also `addListener()`
1096    pub fn get_listeners(&self) -> &Vec<Box<dyn ClientListener>> {
1097        &self.listeners
1098    }
1099
1100    /// Inquiry method that gets the current client status and transport (when applicable).
1101    ///
1102    /// # Returns
1103    ///
1104    /// The current client status. It can be one of the following values:
1105    ///
1106    /// - `"CONNECTING"`: the client is waiting for a Server's response in order to establish a connection;
1107    /// - `"CONNECTED:STREAM-SENSING"`: the client has received a preliminary response from the server
1108    ///   and is currently verifying if a streaming connection is possible;
1109    /// - `"CONNECTED:WS-STREAMING"`: a streaming connection over WebSocket is active;
1110    /// - `"CONNECTED:HTTP-STREAMING"`: a streaming connection over HTTP is active;
1111    /// - `"CONNECTED:WS-POLLING"`: a polling connection over WebSocket is in progress;
1112    /// - `"CONNECTED:HTTP-POLLING"`: a polling connection over HTTP is in progress;
1113    /// - `"STALLED"`: the Server has not been sending data on an active streaming connection for
1114    ///   longer than a configured time;
1115    /// - `"DISCONNECTED:WILL-RETRY"`: no connection is currently active but one will be opened
1116    ///   (possibly after a timeout);
1117    /// - `"DISCONNECTED:TRYING-RECOVERY"`: no connection is currently active, but one will be opened
1118    ///   as soon as possible, as an attempt to recover the current session after a connection issue;
1119    /// - `"DISCONNECTED"`: no connection is currently active.
1120    ///
1121    /// See also `ClientListener.onStatusChange()`
1122    pub fn get_status(&self) -> &ClientStatus {
1123        &self.status
1124    }
1125
1126    /// Inquiry method that returns a list containing all the `Subscription` instances that are
1127    /// currently "active" on this `LightstreamerClient`.
1128    ///
1129    /// Internal second-level `Subscription` are not included.
1130    ///
1131    /// # Returns
1132    ///
1133    /// A list, containing all the `Subscription` currently "active" on this `LightstreamerClient`.
1134    /// The list can be empty.
1135    ///
1136    /// See also `subscribe()`
1137    pub fn get_subscriptions(&self) -> &Vec<Subscription> {
1138        &self.subscriptions
1139    }
1140
1141    /// Creates a new instance of `LightstreamerClient`.
1142    ///
1143    /// The constructor initializes the client with the server address and adapter set, if provided.
1144    /// It sets up the connection details and options for the client. If no server address or
1145    /// adapter set is specified, those properties on the client will be `None`. This allows
1146    /// for late configuration of these details before connecting to the Lightstreamer server.
1147    ///
1148    /// # Arguments
1149    /// * `server_address` - An optional reference to a string slice that represents the server
1150    ///   address to connect to. If `None`, the server address must be set later.
1151    /// * `adapter_set` - An optional reference to a string slice that specifies the adapter set name.
1152    ///   If `None`, the adapter set must be set later.
1153    ///
1154    /// # Returns
1155    /// A result containing the new `LightstreamerClient` instance if successful, or an
1156    /// `IllegalStateException` if the initialization fails due to invalid state conditions.
1157    ///
1158    /// # Panics
1159    /// Does not panic under normal circumstances. However, unexpected internal errors during
1160    /// the creation of internal components could cause panics, which should be considered when
1161    /// using this function in production code.
1162    ///
1163    pub fn new(
1164        server_address: Option<&str>,
1165        adapter_set: Option<&str>,
1166        username: Option<&str>,
1167        password: Option<&str>,
1168    ) -> Result<LightstreamerClient, LightstreamerError> {
1169        let connection_details =
1170            ConnectionDetails::new(server_address, adapter_set, username, password)?;
1171        let connection_options = ConnectionOptions::default();
1172        let (subscription_sender, subscription_receiver) = channel(100);
1173
1174        Ok(LightstreamerClient {
1175            server_address: server_address.map(|s| s.to_string()),
1176            adapter_set: adapter_set.map(|s| s.to_string()),
1177            connection_details,
1178            connection_options,
1179            listeners: Vec::new(),
1180            subscriptions: Vec::new(),
1181            status: ClientStatus::Disconnected(DisconnectionType::WillRetry),
1182            logging: LogType::StdLogs,
1183            subscription_sender,
1184            subscription_receiver,
1185            connection_manager: None,
1186            reconnection_config: ReconnectionConfig::default(),
1187            heartbeat_config: HeartbeatConfig::default(),
1188            auto_reconnect_enabled: false,
1189        })
1190    }
1191
1192    /// Removes a listener from the `LightstreamerClient` instance so that it will not receive
1193    /// events anymore.
1194    ///
1195    /// A listener can be removed at any time.
1196    ///
1197    /// # Parameters
1198    ///
1199    /// * `listener`: The listener to be removed.
1200    ///
1201    /// See also `addListener()`
1202    pub fn remove_listener(&mut self, _listener: Box<dyn ClientListener>) {
1203        unimplemented!("Implement mechanism to remove listener from LightstreamerClient");
1204        //self.listeners.remove(&listener);
1205    }
1206
1207    /// Operation method that sends a message to the Server. The message is interpreted and handled
1208    /// by the Metadata Adapter associated to the current Session. This operation supports in-order
1209    /// guaranteed message delivery with automatic batching. In other words, messages are guaranteed
1210    /// to arrive exactly once and respecting the original order, whatever is the underlying transport
1211    /// (HTTP or WebSockets). Furthermore, high frequency messages are automatically batched, if
1212    /// necessary, to reduce network round trips.
1213    ///
1214    /// Upon subsequent calls to the method, the sequential management of the involved messages is
1215    /// guaranteed. The ordering is determined by the order in which the calls to `sendMessage` are
1216    /// issued.
1217    ///
1218    /// If a message, for any reason, doesn't reach the Server (this is possible with the HTTP transport),
1219    /// it will be resent; however, this may cause the subsequent messages to be delayed. For this
1220    /// reason, each message can specify a "delayTimeout", which is the longest time the message,
1221    /// after reaching the Server, can be kept waiting if one of more preceding messages haven't
1222    /// been received yet. If the "delayTimeout" expires, these preceding messages will be discarded;
1223    /// any discarded message will be notified to the listener through `ClientMessageListener.onDiscarded()`.
1224    /// Note that, because of the parallel transport of the messages, if a zero or very low timeout
1225    /// is set for a message and the previous message was sent immediately before, it is possible
1226    /// that the latter gets discarded even if no communication issues occur. The Server may also
1227    /// enforce its own timeout on missing messages, to prevent keeping the subsequent messages for
1228    /// long time.
1229    ///
1230    /// Sequence identifiers can also be associated with the messages. In this case, the sequential
1231    /// management is restricted to all subsets of messages with the same sequence identifier associated.
1232    ///
1233    /// Notifications of the operation outcome can be received by supplying a suitable listener. The
1234    /// supplied listener is guaranteed to be eventually invoked; listeners associated with a sequence
1235    /// are guaranteed to be invoked sequentially.
1236    ///
1237    /// The "UNORDERED_MESSAGES" sequence name has a special meaning. For such a sequence, immediate
1238    /// processing is guaranteed, while strict ordering and even sequentialization of the processing
1239    /// is not enforced. Likewise, strict ordering of the notifications is not enforced. However,
1240    /// messages that, for any reason, should fail to reach the Server whereas subsequent messages
1241    /// had succeeded, might still be discarded after a server-side timeout, in order to ensure that
1242    /// the listener eventually gets a notification.
1243    ///
1244    /// Moreover, if "UNORDERED_MESSAGES" is used and no listener is supplied, a "fire and forget"
1245    /// scenario is assumed. In this case, no checks on missing, duplicated or overtaken messages
1246    /// are performed at all, so as to optimize the processing and allow the highest possible throughput.
1247    ///
1248    /// Since a message is handled by the Metadata Adapter associated to the current connection, a
1249    /// message can be sent only if a connection is currently active. If the special `enqueueWhileDisconnected`
1250    /// flag is specified it is possible to call the method at any time and the client will take
1251    /// care of sending the message as soon as a connection is available, otherwise, if the current
1252    /// status is "DISCONNECTED*", the message will be abandoned and the `ClientMessageListener.onAbort()`
1253    /// event will be fired.
1254    ///
1255    /// Note that, in any case, as soon as the status switches again to "DISCONNECTED*", any message
1256    /// still pending is aborted, including messages that were queued with the `enqueueWhileDisconnected`
1257    /// flag set to `true`.
1258    ///
1259    /// Also note that forwarding of the message to the server is made in a separate thread, hence,
1260    /// if a message is sent while the connection is active, it could be aborted because of a subsequent
1261    /// disconnection. In the same way a message sent while the connection is not active might be
1262    /// sent because of a subsequent connection.
1263    ///
1264    /// # Parameters
1265    ///
1266    /// * `message`: a text message, whose interpretation is entirely demanded to the Metadata Adapter
1267    ///   associated to the current connection.
1268    /// * `sequence`: an alphanumeric identifier, used to identify a subset of messages to be managed
1269    ///   in sequence; underscore characters are also allowed. If the "UNORDERED_MESSAGES" identifier
1270    ///   is supplied, the message will be processed in the special way described above. The parameter
1271    ///   is optional; if set to `None`, "UNORDERED_MESSAGES" is used as the sequence name.
1272    /// * `delay_timeout`: a timeout, expressed in milliseconds. If higher than the Server configured
1273    ///   timeout on missing messages, the latter will be used instead. The parameter is optional; if
1274    ///   a negative value is supplied, the Server configured timeout on missing messages will be applied.
1275    ///   This timeout is ignored for the special "UNORDERED_MESSAGES" sequence, although a server-side
1276    ///   timeout on missing messages still applies.
1277    /// * `listener`: an object suitable for receiving notifications about the processing outcome. The
1278    ///   parameter is optional; if not supplied, no notification will be available.
1279    /// * `enqueue_while_disconnected`: if this flag is set to `true`, and the client is in a disconnected
1280    ///   status when the provided message is handled, then the message is not aborted right away but
1281    ///   is queued waiting for a new session. Note that the message can still be aborted later when
1282    ///   a new session is established.
1283    pub fn send_message(
1284        &mut self,
1285        message: &str,
1286        sequence: Option<&str>,
1287        _delay_timeout: Option<u64>,
1288        listener: Option<Box<dyn ClientMessageListener>>,
1289        enqueue_while_disconnected: bool,
1290    ) {
1291        let _sequence = sequence.unwrap_or("UNORDERED_MESSAGES");
1292
1293        // Handle the message based on the current connection status
1294        match &self.status {
1295            ClientStatus::Connected(_connection_type) => {
1296                // Send the message to the server in a separate thread
1297                // ...
1298            }
1299            ClientStatus::Disconnected(_disconnection_type) => {
1300                if enqueue_while_disconnected {
1301                    // Enqueue the message to be sent when a connection is available
1302                    // ...
1303                } else {
1304                    // Abort the message and notify the listener
1305                    if let Some(listener) = listener {
1306                        listener.on_abort(message, false);
1307                    }
1308                }
1309            }
1310            _ => {
1311                // Enqueue the message to be sent when a connection is available
1312                // ...
1313            }
1314        }
1315        unimplemented!("Complete mechanism to send message to LightstreamerClient.");
1316    }
1317
1318    /// Static method that permits to configure the logging system used by the library. The logging
1319    /// system must respect the `LoggerProvider` interface. A custom class can be used to wrap any
1320    /// third-party logging system.
1321    ///
1322    /// If no logging system is specified, all the generated log is discarded.
1323    ///
1324    /// The following categories are available to be consumed:
1325    ///
1326    /// - `lightstreamer.stream`: logs socket activity on Lightstreamer Server connections; at INFO
1327    ///   level, socket operations are logged; at DEBUG level, read/write data exchange is logged.
1328    /// - `lightstreamer.protocol`: logs requests to Lightstreamer Server and Server answers; at INFO
1329    ///   level, requests are logged; at DEBUG level, request details and events from the Server are logged.
1330    /// - `lightstreamer.session`: logs Server Session lifecycle events; at INFO level, lifecycle events
1331    ///   are logged; at DEBUG level, lifecycle event details are logged.
1332    /// - `lightstreamer.subscriptions`: logs subscription requests received by the clients and the related
1333    ///   updates; at WARN level, alert events from the Server are logged; at INFO level, subscriptions
1334    ///   and unsubscriptions are logged; at DEBUG level, requests batching and update details are logged.
1335    /// - `lightstreamer.actions`: logs settings / API calls.
1336    ///
1337    /// # Parameters
1338    ///
1339    /// * `provider`: A `LoggerProvider` instance that will be used to generate log messages by the
1340    ///   library classes.
1341    pub fn set_logger_provider() {
1342        unimplemented!("Implement mechanism to set logger provider for LightstreamerClient.");
1343    }
1344    /*
1345    pub fn set_logger_provider(provider: LoggerProvider) {
1346        // Implementation for set_logger_provider
1347    }
1348    */
1349
1350    /// Provides a mean to control the way TLS certificates are evaluated, with the possibility to
1351    /// accept untrusted ones.
1352    ///
1353    /// May be called only once before creating any `LightstreamerClient` instance.
1354    ///
1355    /// # Parameters
1356    ///
1357    /// * `factory`: an instance of `ssl.SSLContext`
1358    ///
1359    /// # Raises
1360    ///
1361    /// * `IllegalArgumentException`: if the factory is `None`
1362    /// * `IllegalStateException`: if a factory is already installed
1363    pub fn set_trust_manager_factory() {
1364        unimplemented!("Implement mechanism to set trust manager factory for LightstreamerClient.");
1365    }
1366    /*
1367    pub fn set_trust_manager_factory(factory: Option<SslContext>) -> Result<(), IllegalArgumentException> {
1368        if factory.is_none() {
1369            return Err(IllegalArgumentException::new(
1370                "Factory cannot be None",
1371            ));
1372        }
1373
1374        // Implementation for set_trust_manager_factory
1375        Ok(())
1376    }
1377    */
1378
1379    /// Adds a subscription to the `LightstreamerClient` instance.
1380    ///
1381    /// Active subscriptions are subscribed to through the server as soon as possible (i.e. as soon
1382    /// as there is a session available). Active `Subscription` are automatically persisted across different
1383    /// sessions as long as a related unsubscribe call is not issued.
1384    ///
1385    /// Subscriptions can be given to the `LightstreamerClient` at any time. Once done the `Subscription`
1386    /// immediately enters the "active" state.
1387    ///
1388    /// Once "active", a `Subscription` instance cannot be provided again to a `LightstreamerClient`
1389    /// unless it is first removed from the "active" state through a call to `unsubscribe()`.
1390    ///
1391    /// Also note that forwarding of the subscription to the server is made in a separate thread.
1392    ///
1393    /// A successful subscription to the server will be notified through a `SubscriptionListener.onSubscription()`
1394    /// event.
1395    ///
1396    /// # Parameters
1397    ///
1398    /// * `subscription_sender`: A `Sender` object that sends a `SubscriptionRequest` to the `LightstreamerClient`
1399    /// * `subscription`: A `Subscription` object, carrying all the information needed to process real-time
1400    ///   values.
1401    ///
1402    /// See also `unsubscribe()`
1403    ///
1404    /// # Errors
1405    ///
1406    /// Returns an error if the subscription channel is closed.
1407    pub async fn subscribe(
1408        subscription_sender: Sender<SubscriptionRequest>,
1409        subscription: Subscription,
1410    ) -> Result<(), LightstreamerError> {
1411        subscription_sender
1412            .send(SubscriptionRequest {
1413                subscription: Some(subscription),
1414                subscription_id: None,
1415            })
1416            .await
1417            .map_err(|e| LightstreamerError::channel(format!("Failed to send subscription: {}", e)))
1418    }
1419
1420    /// If you want to be able to unsubscribe from a subscription, you need to keep track of the id
1421    /// of the subscription. This blocking method allows you to wait for the id of the subscription
1422    /// to be returned.
1423    ///
1424    /// # Parameters
1425    ///
1426    /// * `subscription_sender`: A `Sender` object that sends a `SubscriptionRequest` to the `LightstreamerClient`
1427    /// * `subscription`: A `Subscription` object, carrying all the information needed to process real-time
1428    ///
1429    pub async fn subscribe_get_id(
1430        subscription_sender: Sender<SubscriptionRequest>,
1431        mut subscription: Subscription,
1432    ) -> Result<usize, LightstreamerError> {
1433        // Extract the id_receiver before sending the subscription
1434        let mut id_receiver = subscription.id_receiver;
1435
1436        // Create a new channel for the subscription we're about to send
1437        let (_new_sender, new_receiver) = channel(1);
1438        subscription.id_receiver = new_receiver;
1439
1440        // Send the subscription
1441        LightstreamerClient::subscribe(subscription_sender, subscription).await?;
1442
1443        // Wait for the ID to be updated through the channel
1444        match id_receiver.recv().await {
1445            Some(id) => Ok(id),
1446            None => Err(LightstreamerError::invalid_state(
1447                "Failed to get subscription id",
1448            )),
1449        }
1450    }
1451
1452    /// Operation method that removes a `Subscription` that is currently in the "active" state.
1453    ///
1454    /// By bringing back a `Subscription` to the "inactive" state, the unsubscription from all its
1455    /// items is requested to Lightstreamer Server.
1456    ///
1457    /// Subscription can be unsubscribed from at any time. Once done the `Subscription` immediately
1458    /// exits the "active" state.
1459    ///
1460    /// Note that forwarding of the unsubscription to the server is made in a separate thread.
1461    ///
1462    /// The unsubscription will be notified through a `SubscriptionListener.onUnsubscription()` event.
1463    ///
1464    /// # Parameters
1465    ///
1466    /// * `subscription_sender`: A `Sender` object that sends a `SubscriptionRequest` to the `LightstreamerClient`
1467    /// * `subscription_id`: The id of the subscription to be unsubscribed from.
1468    ///   instance.
1469    ///
1470    /// # Errors
1471    ///
1472    /// Returns an error if the subscription channel is closed.
1473    pub async fn unsubscribe(
1474        subscription_sender: Sender<SubscriptionRequest>,
1475        subscription_id: usize,
1476    ) -> Result<(), LightstreamerError> {
1477        subscription_sender
1478            .send(SubscriptionRequest {
1479                subscription: None,
1480                subscription_id: Some(subscription_id),
1481            })
1482            .await
1483            .map_err(|e| {
1484                LightstreamerError::channel(format!("Failed to send unsubscription: {}", e))
1485            })
1486    }
1487
1488    /// Method setting enum for the logging of this instance.
1489    ///
1490    /// Default logging type is StdLogs, corresponding to `stdout`
1491    ///
1492    /// `LightstreamerClient` has methods for logging that are compatible with the `Tracing` crate.
1493    /// Enabling logging for the `Tracing` crate requires implementation of a tracing subscriber
1494    /// and its configuration and formatting.
1495    ///
1496    /// # Parameters
1497    ///
1498    /// * `logging`: An enum declaring the logging type of this `LightstreamerClient` instance.
1499    pub fn set_logging_type(&mut self, logging: LogType) {
1500        self.logging = logging;
1501    }
1502
1503    /// Method for logging messages
1504    ///
1505    /// Match case wraps log types. `loglevel` param ignored in StdLogs case, all output to stdout.
1506    ///
1507    /// # Parameters
1508    ///
1509    /// * `loglevel` Enum determining use of stdout or Tracing subscriber.
1510    pub fn make_log(&mut self, loglevel: Level, log: &str) {
1511        match self.logging {
1512            LogType::StdLogs => {
1513                debug!("{}", log);
1514            }
1515            LogType::TracingLogs => match loglevel {
1516                Level::INFO => {
1517                    info!(log);
1518                }
1519                Level::WARN => {
1520                    warn!(log);
1521                }
1522                Level::ERROR => {
1523                    error!(log);
1524                }
1525                Level::TRACE => {
1526                    trace!(log);
1527                }
1528                Level::DEBUG => {
1529                    debug!(log);
1530                }
1531            },
1532        }
1533    }
1534
1535    /// Enables automatic reconnection with default configuration.
1536    pub fn enable_auto_reconnect(&mut self) {
1537        self.auto_reconnect_enabled = true;
1538        // ConnectionManager will be created during connect() to avoid circular dependency
1539    }
1540
1541    /// Enables automatic reconnection with the specified configuration.
1542    ///
1543    /// # Parameters
1544    ///
1545    /// * `reconnection_config`: Configuration for reconnection behavior
1546    /// * `heartbeat_config`: Configuration for heartbeat monitoring
1547    pub fn enable_auto_reconnect_with_config(
1548        &mut self,
1549        reconnection_config: ReconnectionConfig,
1550        heartbeat_config: HeartbeatConfig,
1551    ) -> Result<(), LightstreamerError> {
1552        self.auto_reconnect_enabled = true;
1553        self.reconnection_config = reconnection_config;
1554        self.heartbeat_config = heartbeat_config;
1555        self.auto_reconnect_enabled = true;
1556        // ConnectionManager will be created during connect() to avoid circular dependency
1557        Ok(())
1558    }
1559
1560    /// Disables automatic reconnection.
1561    pub async fn disable_auto_reconnect(&mut self) {
1562        self.auto_reconnect_enabled = false;
1563        if let Some(manager) = &self.connection_manager {
1564            manager.shutdown().await;
1565        }
1566        self.connection_manager = None;
1567    }
1568
1569    /// Returns whether automatic reconnection is currently enabled.
1570    pub fn is_auto_reconnect_enabled(&self) -> bool {
1571        self.auto_reconnect_enabled
1572    }
1573
1574    /// Gets the current reconnection configuration.
1575    pub fn get_reconnection_config(&self) -> &ReconnectionConfig {
1576        &self.reconnection_config
1577    }
1578
1579    /// Gets the current heartbeat configuration.
1580    pub fn get_heartbeat_config(&self) -> &HeartbeatConfig {
1581        &self.heartbeat_config
1582    }
1583
1584    /// Updates the reconnection configuration.
1585    pub fn set_reconnection_config(&mut self, config: ReconnectionConfig) {
1586        self.reconnection_config = config;
1587        // Note: ConnectionManager doesn't support runtime config updates
1588        // The configuration is set during ConnectionManager creation
1589    }
1590
1591    /// Updates the heartbeat configuration.
1592    pub fn set_heartbeat_config(&mut self, config: HeartbeatConfig) {
1593        self.heartbeat_config = config;
1594        // Note: ConnectionManager doesn't support runtime config updates
1595        // The configuration is set during ConnectionManager creation
1596    }
1597
1598    /// Gets the current connection state from the connection manager.
1599    pub async fn get_connection_state(&self) -> ConnectionState {
1600        if let Some(manager) = &self.connection_manager {
1601            manager.get_connection_state().await
1602        } else {
1603            ConnectionState::Disconnected
1604        }
1605    }
1606
1607    /// Gets connection metrics from the connection manager.
1608    pub async fn get_connection_metrics(&self) -> crate::connection::management::ConnectionMetrics {
1609        if let Some(manager) = &self.connection_manager {
1610            manager.get_metrics().await
1611        } else {
1612            crate::connection::management::ConnectionMetrics::default()
1613        }
1614    }
1615
1616    /// Forces an immediate reconnection attempt if auto-reconnect is enabled.
1617    pub async fn force_reconnect(&mut self) -> Result<(), LightstreamerError> {
1618        if !self.auto_reconnect_enabled {
1619            return Err(LightstreamerError::invalid_state(
1620                "Auto-reconnect is not enabled",
1621            ));
1622        }
1623
1624        if let Some(manager) = &mut self.connection_manager {
1625            manager.force_reconnect().await?;
1626        }
1627
1628        Ok(())
1629    }
1630}
1631
1632/// Computes the effective liveness window after the server declares its
1633/// keepalive interval in the `conok` message.
1634///
1635/// Returns the server-declared value when no window was configured
1636/// (`current_ms == 0`), otherwise the larger of the two — the window may only
1637/// widen, never shrink, so a server declaring a longer keepalive cannot cause
1638/// false-positive disconnects.
1639#[must_use]
1640#[inline]
1641fn widen_keepalive_timeout(current_ms: u64, server_declared_ms: u64) -> u64 {
1642    if current_ms == 0 {
1643        server_declared_ms
1644    } else {
1645        current_ms.max(server_declared_ms)
1646    }
1647}
1648
1649#[cfg(test)]
1650mod tests {
1651    use super::*;
1652    use crate::subscription::{Subscription, SubscriptionListener, SubscriptionMode};
1653
1654    use std::fmt::Debug;
1655    use std::sync::{Arc, Mutex};
1656    use tokio::sync::Notify;
1657
1658    #[derive(Debug)]
1659    struct MockClientListener {
1660        property_changes: Arc<Mutex<Vec<String>>>,
1661        status_changes: Arc<Mutex<Vec<String>>>,
1662        server_errors: Arc<Mutex<Vec<(i32, String)>>>,
1663    }
1664
1665    impl MockClientListener {
1666        fn new() -> Self {
1667            MockClientListener {
1668                property_changes: Arc::new(Mutex::new(Vec::new())),
1669                status_changes: Arc::new(Mutex::new(Vec::new())),
1670                server_errors: Arc::new(Mutex::new(Vec::new())),
1671            }
1672        }
1673
1674        #[allow(dead_code)]
1675        fn with_shared_data(
1676            property_changes: Arc<Mutex<Vec<String>>>,
1677            status_changes: Arc<Mutex<Vec<String>>>,
1678            server_errors: Arc<Mutex<Vec<(i32, String)>>>,
1679        ) -> Self {
1680            MockClientListener {
1681                property_changes,
1682                status_changes,
1683                server_errors,
1684            }
1685        }
1686    }
1687
1688    impl ClientListener for MockClientListener {
1689        fn on_property_change(&self, property: &str) {
1690            if let Ok(mut guard) = self.property_changes.lock() {
1691                guard.push(property.to_string());
1692            }
1693        }
1694
1695        fn on_status_change(&self, status: &str) {
1696            if let Ok(mut guard) = self.status_changes.lock() {
1697                guard.push(status.to_string());
1698            }
1699        }
1700
1701        fn on_server_error(&self, code: i32, message: &str) {
1702            if let Ok(mut guard) = self.server_errors.lock() {
1703                guard.push((code, message.to_string()));
1704            }
1705        }
1706    }
1707
1708    #[allow(dead_code)]
1709    struct MockSubscriptionListener;
1710
1711    impl SubscriptionListener for MockSubscriptionListener {
1712        fn on_subscription(&mut self) {}
1713        fn on_unsubscription(&mut self) {}
1714        fn on_item_update(&self, _update: &ItemUpdate) {}
1715    }
1716
1717    impl Debug for MockSubscriptionListener {
1718        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1719            write!(f, "MockSubscriptionListener")
1720        }
1721    }
1722
1723    #[allow(dead_code)]
1724    struct LightstreamerClientTestContext {
1725        client: LightstreamerClient,
1726        property_changes: Arc<Mutex<Vec<String>>>,
1727        status_changes: Arc<Mutex<Vec<String>>>,
1728        server_errors: Arc<Mutex<Vec<(i32, String)>>>,
1729    }
1730
1731    impl LightstreamerClientTestContext {
1732        #[allow(dead_code)]
1733        fn new() -> Result<Self, LightstreamerError> {
1734            let property_changes = Arc::new(Mutex::new(Vec::new()));
1735            let status_changes = Arc::new(Mutex::new(Vec::new()));
1736            let server_errors = Arc::new(Mutex::new(Vec::new()));
1737            let listener = MockClientListener::with_shared_data(
1738                Arc::clone(&property_changes),
1739                Arc::clone(&status_changes),
1740                Arc::clone(&server_errors),
1741            );
1742
1743            let mut client = LightstreamerClient::new(
1744                Some("http://test.lightstreamer.com"),
1745                Some("DEMO"),
1746                None,
1747                None,
1748            )?;
1749            client.add_listener(Box::new(listener));
1750
1751            Ok(LightstreamerClientTestContext {
1752                client,
1753                property_changes,
1754                status_changes,
1755                server_errors,
1756            })
1757        }
1758    }
1759
1760    #[test]
1761    fn test_new_lightstreamer_client() -> Result<(), LightstreamerError> {
1762        let client = LightstreamerClient::new(
1763            Some("http://test.lightstreamer.com"),
1764            Some("DEMO"),
1765            None,
1766            None,
1767        )?;
1768        assert_eq!(
1769            client.server_address,
1770            Some("http://test.lightstreamer.com".to_string())
1771        );
1772        assert_eq!(client.adapter_set, Some("DEMO".to_string()));
1773        let client = LightstreamerClient::new(
1774            Some("http://test.lightstreamer.com"),
1775            Some("DEMO"),
1776            Some("user1"),
1777            Some("pass1"),
1778        )?;
1779        assert_eq!(
1780            client.connection_details.get_user(),
1781            Some(&"user1".to_string())
1782        );
1783        assert_eq!(
1784            client.connection_details.get_password(),
1785            Some(&"pass1".to_string())
1786        );
1787        let result = LightstreamerClient::new(Some("invalid-url"), Some("DEMO"), None, None);
1788        assert!(result.is_err());
1789        let client = LightstreamerClient::new(None, Some("DEMO"), None, None)?;
1790        assert_eq!(client.server_address, None);
1791        Ok(())
1792    }
1793
1794    #[test]
1795    fn test_add_listener() -> Result<(), LightstreamerError> {
1796        let mut client = LightstreamerClient::new(
1797            Some("http://test.lightstreamer.com"),
1798            Some("DEMO"),
1799            None,
1800            None,
1801        )?;
1802        assert_eq!(client.listeners.len(), 0);
1803        let listener = Box::new(MockClientListener::new());
1804        client.add_listener(listener);
1805        assert_eq!(client.listeners.len(), 1);
1806        let listener2 = Box::new(MockClientListener::new());
1807        client.add_listener(listener2);
1808        assert_eq!(client.listeners.len(), 2);
1809        Ok(())
1810    }
1811
1812    #[test]
1813    fn test_get_listeners() -> Result<(), LightstreamerError> {
1814        let mut client = LightstreamerClient::new(
1815            Some("http://test.lightstreamer.com"),
1816            Some("DEMO"),
1817            None,
1818            None,
1819        )?;
1820        assert_eq!(client.get_listeners().len(), 0);
1821
1822        let listener = Box::new(MockClientListener::new());
1823        client.add_listener(listener);
1824        assert_eq!(client.get_listeners().len(), 1);
1825        Ok(())
1826    }
1827
1828    #[test]
1829    fn test_get_status() -> Result<(), LightstreamerError> {
1830        let client = LightstreamerClient::new(
1831            Some("http://test.lightstreamer.com"),
1832            Some("DEMO"),
1833            None,
1834            None,
1835        )?;
1836        match client.get_status() {
1837            ClientStatus::Disconnected(DisconnectionType::WillRetry) => {}
1838            _ => panic!("Expected initial status to be DISCONNECTED:WILL-RETRY"),
1839        }
1840        Ok(())
1841    }
1842
1843    #[test]
1844    fn test_get_subscriptions() -> Result<(), LightstreamerError> {
1845        let client = LightstreamerClient::new(
1846            Some("http://test.lightstreamer.com"),
1847            Some("DEMO"),
1848            None,
1849            None,
1850        )?;
1851        assert_eq!(client.get_subscriptions().len(), 0);
1852        Ok(())
1853    }
1854
1855    #[tokio::test]
1856    async fn test_connect_with_no_server_address() -> Result<(), LightstreamerError> {
1857        let client = LightstreamerClient::new(None, Some("DEMO"), None, None)?;
1858        let client_arc = Arc::new(tokio::sync::Mutex::new(client));
1859        let shutdown_signal = Arc::new(Notify::new());
1860        let result = LightstreamerClient::connect(client_arc, shutdown_signal).await;
1861        assert!(result.is_err());
1862        Ok(())
1863    }
1864
1865    #[tokio::test]
1866    async fn test_forced_transport_validation() -> Result<(), LightstreamerError> {
1867        let mut client = LightstreamerClient::new(
1868            Some("http://test.lightstreamer.com"),
1869            Some("DEMO"),
1870            None,
1871            None,
1872        )?;
1873
1874        client.connection_options.set_forced_transport(None);
1875        let client_arc = Arc::new(tokio::sync::Mutex::new(client));
1876        let shutdown_signal = Arc::new(Notify::new());
1877        let result = LightstreamerClient::connect(client_arc.clone(), shutdown_signal).await;
1878        assert!(result.is_err());
1879        client_arc
1880            .lock()
1881            .await
1882            .connection_options
1883            .set_forced_transport(Some(Transport::WsStreaming));
1884        Ok(())
1885    }
1886
1887    #[test]
1888    fn test_subscription_params_generation() -> Result<(), LightstreamerError> {
1889        let subscription = Subscription::new(
1890            SubscriptionMode::Merge,
1891            Some(vec!["item1".to_string(), "item2".to_string()]),
1892            Some(vec!["field1".to_string(), "field2".to_string()]),
1893        )?;
1894
1895        let params_str = LightstreamerClient::get_subscription_params(&subscription, 1)?;
1896
1897        assert!(params_str.contains("LS_reqId=1"));
1898        assert!(params_str.contains("LS_op=add"));
1899        assert!(params_str.contains("LS_subId="));
1900        assert!(params_str.contains("LS_mode=MERGE"));
1901        assert!(params_str.contains("LS_group="));
1902        assert!(params_str.contains("LS_schema="));
1903        Ok(())
1904    }
1905
1906    #[test]
1907    fn test_unsubscription_params_generation() -> Result<(), LightstreamerError> {
1908        let params_str = LightstreamerClient::get_unsubscription_params(42, 123)?;
1909
1910        assert!(params_str.contains("LS_reqId=123"));
1911        assert!(params_str.contains("LS_op=delete"));
1912        assert!(params_str.contains("LS_subId=42"));
1913        Ok(())
1914    }
1915
1916    #[test]
1917    fn test_logging_functions() -> Result<(), LightstreamerError> {
1918        let mut client = LightstreamerClient::new(
1919            Some("http://test.lightstreamer.com"),
1920            Some("DEMO"),
1921            None,
1922            None,
1923        )?;
1924
1925        client.set_logging_type(LogType::StdLogs);
1926
1927        client.make_log(Level::INFO, "Test log message");
1928        client.make_log(Level::DEBUG, "Test debug message");
1929        client.set_logging_type(LogType::TracingLogs);
1930        client.make_log(Level::INFO, "Test tracing log message");
1931        client.make_log(Level::DEBUG, "Test tracing debug message");
1932        Ok(())
1933    }
1934
1935    #[test]
1936    fn test_debug_implementation() -> Result<(), LightstreamerError> {
1937        let client = LightstreamerClient::new(
1938            Some("http://test.lightstreamer.com"),
1939            Some("DEMO"),
1940            None,
1941            None,
1942        )?;
1943
1944        // Test that Debug implementation works without panicking
1945        let debug_string = format!("{:?}", client);
1946
1947        // Verify it contains expected fields
1948        assert!(debug_string.contains("server_address"));
1949        assert!(debug_string.contains("adapter_set"));
1950        assert!(debug_string.contains("connection_details"));
1951        assert!(debug_string.contains("connection_options"));
1952        assert!(debug_string.contains("listeners"));
1953        assert!(debug_string.contains("subscriptions"));
1954
1955        // Verify the values are included
1956        assert!(debug_string.contains("http://test.lightstreamer.com"));
1957        assert!(debug_string.contains("DEMO"));
1958        Ok(())
1959    }
1960
1961    #[test]
1962    #[should_panic(expected = "Implement mechanism to add cookies to LightstreamerClient")]
1963    fn test_add_cookies() {
1964        // Test the static method add_cookies
1965        let cookie = Cookie::new("test_cookie", "test_value");
1966        LightstreamerClient::add_cookies("http://test.lightstreamer.com", &cookie);
1967    }
1968
1969    #[test]
1970    #[should_panic(expected = "not implemented")]
1971    fn test_get_cookies() {
1972        // Test the static method get_cookies
1973        LightstreamerClient::get_cookies(Some("http://test.lightstreamer.com"));
1974    }
1975}
1976
1977#[cfg(test)]
1978mod liveness_tests {
1979    use super::widen_keepalive_timeout;
1980
1981    #[test]
1982    fn test_widen_keepalive_timeout_unconfigured_adopts_server_value() {
1983        assert_eq!(widen_keepalive_timeout(0, 5_000), 5_000);
1984    }
1985
1986    #[test]
1987    fn test_widen_keepalive_timeout_server_longer_widens() {
1988        assert_eq!(widen_keepalive_timeout(15_000, 30_000), 30_000);
1989    }
1990
1991    #[test]
1992    fn test_widen_keepalive_timeout_server_shorter_never_shrinks() {
1993        assert_eq!(widen_keepalive_timeout(15_000, 5_000), 15_000);
1994    }
1995
1996    #[test]
1997    fn test_widen_keepalive_timeout_equal_values_unchanged() {
1998        assert_eq!(widen_keepalive_timeout(15_000, 15_000), 15_000);
1999    }
2000}