lightstreamer_rs/client/
implementation.rs

1use crate::subscription::{ItemUpdate, Snapshot, Subscription, SubscriptionMode};
2
3use crate::client::Transport;
4pub(crate) use crate::client::listener::ClientListener;
5use crate::client::message_listener::ClientMessageListener;
6use crate::client::model::{ClientStatus, DisconnectionType, LogType};
7use crate::client::request::SubscriptionRequest;
8use crate::client::utils::get_subscription_by_id;
9use crate::connection::{ConnectionDetails, ConnectionOptions};
10use crate::connection::{ConnectionManager, ConnectionState, HeartbeatConfig, ReconnectionConfig};
11use crate::utils::{IllegalStateException, clean_message, parse_arguments};
12use cookie::Cookie;
13use futures_util::{SinkExt, StreamExt};
14use std::collections::HashMap;
15use std::error::Error;
16use std::fmt::{self, Debug, Formatter};
17use std::sync::Arc;
18use tokio::sync::mpsc::channel;
19use tokio::sync::{
20    Mutex, Notify,
21    mpsc::{Receiver, Sender},
22};
23use tokio_tungstenite::{
24    connect_async,
25    tungstenite::{
26        Message,
27        http::{HeaderName, HeaderValue, Request},
28    },
29};
30use tracing::{Level, debug, error, info, instrument, trace, warn};
31use url::Url;
32
33/// Facade class for the management of the communication to Lightstreamer Server. Used to provide
34/// configuration settings, event handlers, operations for the control of the connection lifecycle,
35/// Subscription handling and to send messages.
36///
37/// An instance of `LightstreamerClient` handles the communication with Lightstreamer Server on a
38/// specified endpoint. Hence, it hosts one "Session"; or, more precisely, a sequence of Sessions,
39/// since any Session may fail and be recovered, or it can be interrupted on purpose. So, normally,
40/// a single instance of `LightstreamerClient` is needed.
41///
42/// However, multiple instances of `LightstreamerClient` can be used, toward the same or multiple
43/// endpoints.
44///
45/// You can listen to the events generated by a session by registering an event listener, such as
46/// `ClientListener` or `SubscriptionListener`. These listeners allow you to handle various events,
47/// such as session creation, connection status, subscription updates, and server messages. However,
48/// you should be aware that the event notifications are dispatched by a single thread, the so-called
49/// event thread. This means that if the operations of a listener are slow or blocking, they will
50/// delay the processing of the other listeners and affect the performance of your application.
51/// Therefore, you should delegate any slow or blocking operations to a dedicated thread, and keep
52/// the listener methods as fast and simple as possible. Note that even if you create multiple
53/// instances of `LightstreamerClient`, they will all use a single event thread, that is shared
54/// among them.
55///
56/// # Parameters
57///
58/// * `server_address`: the address of the Lightstreamer Server to which this `LightstreamerClient`
59///   will connect to. It is possible to specify it later by using `None` here. See
60///   `ConnectionDetails.setServerAddress()` for details.
61/// * `adapter_set`: the name of the Adapter Set mounted on Lightstreamer Server to be used to handle
62///   all requests in the Session associated with this `LightstreamerClient`. It is possible not to
63///   specify it at all or to specify it later by using `None` here. See `ConnectionDetails.setAdapterSet()`
64///   for details.
65///
66/// # Raises
67///
68/// * `IllegalArgumentException`: if a not valid address is passed. See `ConnectionDetails.setServerAddress()`
69///   for details.
70pub struct LightstreamerClient {
71    /// The address of the Lightstreamer Server to which this `LightstreamerClient` will connect.
72    server_address: Option<String>,
73    /// The name of the Adapter Set mounted on Lightstreamer Server to be used to handle all
74    /// requests in the Session associated with this `LightstreamerClient`.
75    adapter_set: Option<String>,
76    /// Data object that contains the details needed to open a connection to a Lightstreamer Server.
77    /// This instance is set up by the `LightstreamerClient` object at its own creation. Properties
78    /// of this object can be overwritten by values received from a Lightstreamer Server.
79    pub connection_details: ConnectionDetails,
80    /// Data object that contains options and policies for the connection to the server. This instance
81    /// is set up by the `LightstreamerClient` object at its own creation. Properties of this object
82    /// can be overwritten by values received from a Lightstreamer Server.
83    pub connection_options: ConnectionOptions,
84    /// A list of listeners that will receive events from the `LightstreamerClient` instance.
85    listeners: Vec<Box<dyn ClientListener>>,
86    /// A list containing all the `Subscription` instances that are currently "active" on this
87    /// `LightstreamerClient`.
88    subscriptions: Vec<Subscription>,
89    /// The current status of the client.
90    status: ClientStatus,
91    /// Logging Type to be used
92    logging: LogType,
93    /// The sender that can be used to subscribe/unsubscribe
94    pub subscription_sender: Sender<SubscriptionRequest>,
95    /// The receiver used for subscribe/unsubsribe
96    subscription_receiver: Receiver<SubscriptionRequest>,
97    /// Connection manager for automatic reconnection
98    connection_manager: Option<Arc<ConnectionManager>>,
99    /// Configuration for reconnection behavior
100    reconnection_config: ReconnectionConfig,
101    /// Configuration for heartbeat monitoring
102    heartbeat_config: HeartbeatConfig,
103    /// Whether automatic reconnection is enabled
104    auto_reconnect_enabled: bool,
105}
106
107impl Debug for LightstreamerClient {
108    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
109        f.debug_struct("LightstreamerClient")
110            .field("server_address", &self.server_address)
111            .field("adapter_set", &self.adapter_set)
112            .field("connection_details", &self.connection_details)
113            .field("connection_options", &self.connection_options)
114            .field("listeners", &self.listeners)
115            .field("subscriptions", &self.subscriptions)
116            .finish()
117    }
118}
119
120impl LightstreamerClient {
121    /// A constant string representing the name of the library.
122    pub const LIB_NAME: &'static str = "rust_client";
123
124    /// A constant string representing the version of the library.
125    pub const LIB_VERSION: &'static str = "0.1.0";
126
127    //
128    // Constants for WebSocket connection.
129    //
130    /// WebSocket key used in the WebSocket handshake process.
131    /// This is a base64-encoded value that is used to establish the WebSocket connection.
132    pub const SEC_WEBSOCKET_KEY: &'static str = "PNDUibe9ex7PnsrLbt0N4w==";
133
134    /// WebSocket protocol identifier for the TLCP protocol used by Lightstreamer.
135    /// This identifies the specific subprotocol used over the WebSocket connection.
136    pub const SEC_WEBSOCKET_PROTOCOL: &'static str = "TLCP-2.4.0.lightstreamer.com";
137
138    /// WebSocket version used for the connection.
139    /// Version 13 is the standard version used in modern WebSocket implementations.
140    pub const SEC_WEBSOCKET_VERSION: &'static str = "13";
141
142    /// WebSocket upgrade header value.
143    /// Used to indicate that the client wishes to upgrade from HTTP to WebSocket protocol.
144    pub const SEC_WEBSOCKET_UPGRADE: &'static str = "websocket";
145
146    /// A constant string representing the version of the TLCP protocol used by the library.
147    pub const TLCP_VERSION: &'static str = "TLCP-2.4.0";
148
149    /// Static method that can be used to share cookies between connections to the Server (performed by
150    /// this library) and connections to other sites that are performed by the application. With this
151    /// method, cookies received by the application can be added (or replaced if already present) to
152    /// the cookie set used by the library to access the Server. Obviously, only cookies whose domain
153    /// is compatible with the Server domain will be used internally.
154    ///
155    /// This method should be invoked before calling the `LightstreamerClient.connect()` method.
156    /// However it can be invoked at any time; it will affect the internal cookie set immediately
157    /// and the sending of cookies on the next HTTP request or WebSocket establishment.
158    ///
159    /// # Parameters
160    ///
161    /// * `uri`: the URI from which the supplied cookies were received. It cannot be `None`.
162    /// * `cookies`: an instance of `http.cookies.SimpleCookie`.
163    ///
164    /// See also `getCookies()`
165    pub fn add_cookies(_uri: &str, _cookies: &Cookie) {
166        // Implementation for add_cookies
167        unimplemented!("Implement mechanism to add cookies to LightstreamerClient");
168    }
169
170    /// Adds a listener that will receive events from the `LightstreamerClient` instance.
171    ///
172    /// The same listener can be added to several different `LightstreamerClient` instances.
173    ///
174    /// A listener can be added at any time. A call to add a listener already present will be ignored.
175    ///
176    /// # Parameters
177    ///
178    /// * `listener`: An object that will receive the events as documented in the `ClientListener`
179    ///   interface.
180    ///
181    /// See also `removeListener()`
182    pub fn add_listener(&mut self, listener: Box<dyn ClientListener>) {
183        self.listeners.push(listener);
184    }
185
186    /// Packs s string with the necessary parameters for a subscription request.
187    ///
188    /// # Parameters
189    ///
190    /// * `subscription`: The subscription for which to get the parameters.
191    /// * `request_id`: The request ID to use in the parameters.
192    ///
193    fn get_subscription_params(
194        subscription: &Subscription,
195        request_id: usize,
196    ) -> Result<String, Box<dyn Error + Send + Sync>> {
197        let ls_req_id = request_id.to_string();
198        let ls_sub_id = subscription.id.to_string();
199        let ls_mode = subscription.get_mode().to_string();
200        let ls_group = match subscription.get_item_group() {
201            Some(item_group) => item_group.to_string(),
202            None => match subscription.get_items() {
203                Some(items) => items.join(" "),
204                None => {
205                    return Err(Box::new(std::io::Error::new(
206                        std::io::ErrorKind::InvalidData,
207                        "No item group or items found in subscription.",
208                    )));
209                }
210            },
211        };
212        let ls_schema = match subscription.get_field_schema() {
213            Some(field_schema) => field_schema.to_string(),
214            None => match subscription.get_fields() {
215                Some(fields) => fields.join(" "),
216                None => {
217                    return Err(Box::new(std::io::Error::new(
218                        std::io::ErrorKind::InvalidData,
219                        "No field schema or fields found in subscription.",
220                    )));
221                }
222            },
223        };
224        let ls_data_adapter = match subscription.get_data_adapter() {
225            Some(data_adapter) => data_adapter.to_string(),
226            None => "".to_string(),
227        };
228        let ls_snapshot = subscription
229            .get_requested_snapshot()
230            .unwrap_or_default()
231            .to_string();
232        //
233        // Prepare the subscription request.
234        //
235        let mut params: Vec<(&str, &str)> = vec![
236            ("LS_data_adapter", &ls_data_adapter),
237            ("LS_reqId", &ls_req_id),
238            ("LS_op", "add"),
239            ("LS_subId", &ls_sub_id),
240            ("LS_mode", &ls_mode),
241            ("LS_group", &ls_group),
242            ("LS_schema", &ls_schema),
243            ("LS_ack", "false"),
244        ];
245        // Remove the data adapter parameter if not specified.
246        if ls_data_adapter.is_empty() {
247            params.remove(0);
248        }
249        if !ls_snapshot.is_empty() {
250            params.push(("LS_snapshot", &ls_snapshot));
251        }
252
253        Ok(serde_urlencoded::to_string(&params)?)
254    }
255
256    fn get_unsubscription_params(
257        subscription_id: usize,
258        request_id: usize,
259    ) -> Result<String, Box<dyn Error + Send + Sync>> {
260        let ls_req_id = request_id.to_string();
261        let ls_sub_id = subscription_id.to_string();
262        //
263        // Prepare the unsubscription request.
264        //
265        let params: Vec<(&str, &str)> = vec![
266            ("LS_reqId", &ls_req_id),
267            ("LS_op", "delete"),
268            ("LS_subId", &ls_sub_id),
269        ];
270
271        Ok(serde_urlencoded::to_string(&params)?)
272    }
273
274    /// Operation method that requests to open a Session against the configured Lightstreamer Server.
275    ///
276    /// When `connect()` is called, unless a single transport was forced through `ConnectionOptions.setForcedTransport()`,
277    /// the so called "Stream-Sense" mechanism is started: if the client does not receive any answer
278    /// for some seconds from the streaming connection, then it will automatically open a polling
279    /// connection.
280    ///
281    /// A polling connection may also be opened if the environment is not suitable for a streaming
282    /// connection.
283    ///
284    /// Note that as "polling connection" we mean a loop of polling requests, each of which requires
285    /// opening a synchronous (i.e. not streaming) connection to Lightstreamer Server.
286    ///
287    /// Note that the request to connect is accomplished by the client in a separate thread; this
288    /// means that an invocation to `getStatus()` right after `connect()` might not reflect the change
289    /// yet.
290    ///
291    /// When the request to connect is finally being executed, if the current status of the client
292    /// is not `DISCONNECTED`, then nothing will be done.
293    ///
294    /// # Raises
295    ///
296    /// * `IllegalStateException`: if no server address was configured.
297    ///
298    /// See also `getStatus()`
299    ///
300    /// See also `disconnect()`
301    ///
302    /// See also `ClientListener.onStatusChange()`
303    ///
304    /// See also `ConnectionDetails.setServerAddress()`
305    #[instrument(level = "trace")]
306    pub async fn connect(
307        client: Arc<Mutex<Self>>,
308        shutdown_signal: Arc<Notify>,
309    ) -> Result<(), Box<dyn Error + Send + Sync>> {
310        // If auto-reconnect is enabled, use ConnectionManager
311        let auto_reconnect_enabled = {
312            let client_guard = client.lock().await;
313            client_guard.auto_reconnect_enabled
314        };
315
316        if auto_reconnect_enabled {
317            let (reconnection_config, heartbeat_config) = {
318                let client_guard = client.lock().await;
319                (
320                    client_guard.reconnection_config.clone(),
321                    client_guard.heartbeat_config.clone(),
322                )
323            };
324
325            let weak_client = Arc::downgrade(&client);
326            let connection_manager: Arc<ConnectionManager> =
327                ConnectionManager::new(weak_client, reconnection_config, heartbeat_config);
328
329            {
330                let mut client_guard = client.lock().await;
331                client_guard.connection_manager = Some(connection_manager.clone());
332            }
333
334            // Use direct connection method
335            return client.lock().await.connect_direct(shutdown_signal).await;
336        }
337
338        // Fallback to direct connection without auto-reconnect
339        let mut client_guard = client.lock().await;
340        client_guard.connect_direct(shutdown_signal).await
341    }
342
343    /// Direct connection method without automatic reconnection
344    #[instrument(level = "trace")]
345    pub async fn connect_direct(
346        &mut self,
347        shutdown_signal: Arc<Notify>,
348    ) -> Result<(), Box<dyn Error + Send + Sync>> {
349        // Check if the server address is configured.
350        if self.server_address.is_none() {
351            return Err(Box::new(IllegalStateException::new(
352                "No server address was configured.",
353            )));
354        }
355        //
356        // Only WebSocket streaming transport is currently supported.
357        //
358        let forced_transport = self.connection_options.get_forced_transport();
359        if forced_transport.is_none()
360            || *forced_transport.unwrap() /* unwrap() is safe here */ != Transport::WsStreaming
361        {
362            return Err(Box::new(IllegalStateException::new(
363                "Only WebSocket streaming transport is currently supported.",
364            )));
365        }
366        //
367        // Convert the HTTP URL to a WebSocket URL.
368        //
369        let http_url = self.connection_details.get_server_address().unwrap(); // unwrap() is safe here.
370        let mut url = Url::parse(http_url)
371            .expect("Failed to parse server address URL from connection details.");
372        match url.scheme() {
373            "http" => url
374                .set_scheme("ws")
375                .expect("Failed to set scheme to ws for WebSocket URL."),
376            "https" => url
377                .set_scheme("wss")
378                .expect("Failed to set scheme to wss for WebSocket URL."),
379            invalid_scheme => {
380                return Err(Box::new(IllegalStateException::new(&format!(
381                    "Unsupported scheme '{}' found when converting HTTP URL to WebSocket URL.",
382                    invalid_scheme
383                ))));
384            }
385        }
386        let ws_url = url.as_str();
387
388        // Build the WebSocket request with the necessary headers.
389        let request = Request::builder()
390            .uri(ws_url)
391            .header(
392                HeaderName::from_static("connection"),
393                HeaderValue::from_static("Upgrade"),
394            )
395            .header(
396                HeaderName::from_static("host"),
397                HeaderValue::from_str(url.host_str().unwrap_or("localhost")).map_err(|err| {
398                    IllegalStateException::new(&format!(
399                        "Invalid header value for header with name 'host': {}",
400                        err
401                    ))
402                })?,
403            )
404            .header(
405                HeaderName::from_static("sec-websocket-key"),
406                HeaderValue::from_static(Self::SEC_WEBSOCKET_KEY),
407            )
408            .header(
409                HeaderName::from_static("sec-websocket-protocol"),
410                HeaderValue::from_static(Self::SEC_WEBSOCKET_PROTOCOL),
411            )
412            .header(
413                HeaderName::from_static("sec-websocket-version"),
414                HeaderValue::from_static(Self::SEC_WEBSOCKET_VERSION),
415            )
416            .header(
417                HeaderName::from_static("upgrade"),
418                HeaderValue::from_static(Self::SEC_WEBSOCKET_UPGRADE),
419            )
420            .body(())?;
421
422        // Connect to the Lightstreamer server using WebSocket.
423        let ws_stream = match connect_async(request).await {
424            Ok((ws_stream, response)) => {
425                if let Some(server_header) = response.headers().get("server") {
426                    self.make_log(
427                        Level::INFO,
428                        &format!(
429                            "Connected to Lightstreamer server: {}",
430                            server_header.to_str().unwrap_or("")
431                        ),
432                    );
433                } else {
434                    self.make_log(Level::INFO, "Connected to Lightstreamer server");
435                }
436                ws_stream
437            }
438            Err(err) => {
439                return Err(Box::new(std::io::Error::new(
440                    std::io::ErrorKind::ConnectionRefused,
441                    format!(
442                        "Failed to connect to Lightstreamer server with WebSocket: {}",
443                        err
444                    ),
445                )));
446            }
447        };
448
449        // Split the WebSocket stream into a write and a read stream.
450        let (mut write_stream, mut read_stream) = ws_stream.split();
451
452        //
453        // Initiate communication with the server by sending a 'wsok' message.
454        //
455        write_stream.send(Message::Text("wsok".into())).await?;
456
457        //
458        // Start reading and processing messages from the server.
459        //
460        let mut is_connected = false;
461        let mut request_id: usize = 0;
462        let mut _session_id: Option<String> = None;
463        let mut subscription_id: usize = 0;
464        let mut subscription_item_updates: HashMap<usize, HashMap<usize, ItemUpdate>> =
465            HashMap::new();
466        loop {
467            tokio::select! {
468                message = read_stream.next() => {
469                    match message {
470                        Some(Ok(Message::Text(text))) => {
471                            // Messages could include multiple submessages separated by /r/n.
472                            // Split the message into submessages and process each one separately.
473                            let submessages: Vec<&str> = text.split("\r\n")
474                                .filter(|&line| !line.trim().is_empty()) // Filter out empty lines.
475                                .collect();
476                            for submessage in submessages {
477                                let clean_text = clean_message(submessage);
478                                let submessage_fields: Vec<&str> = clean_text.split(",").collect();
479                                match *submessage_fields.first().unwrap_or(&"") {
480                                    //
481                                    // Errors from server.
482                                    //
483                                    "conerr" | "reqerr" => {
484                                        self.make_log( Level::ERROR, &format!("Received connection error from Lightstreamer server: {}", clean_text) );
485                                        break;
486                                    },
487                                    //
488                                    // Session created successfully.
489                                    //
490                                    "conok" => {
491                                        is_connected = true;
492                                        if let Some(session_id) = submessage_fields.get(1) {
493                                            self.make_log( Level::DEBUG, &format!("Session creation confirmed by server: {}", clean_text) );
494                                            self.make_log( Level::DEBUG, &format!("Session created with ID: {:?}", session_id) );
495                                            //
496                                            // Subscribe to the desired items.
497                                            //
498                                            while let Some(subscription) = self.subscriptions.get_mut(subscription_id) {
499                                                //
500                                                // Gather all the necessary subscription parameters.
501                                                //
502                                                subscription_id += 1;
503                                                request_id += 1;
504                                                subscription.id = subscription_id;
505                                                subscription.id_sender.try_send(subscription_id)?;
506
507                                                let encoded_params = match Self::get_subscription_params(subscription, request_id)
508                                                {
509                                                    Ok(params) => params,
510                                                    Err(err) => {
511                                                        return Err(err);
512                                                    },
513                                                };
514
515                                                write_stream
516                                                    .send(Message::Text(format!("control\r\n{}", encoded_params).into()))
517                                                    .await?;
518                                                debug!("Sent subscription request: '{}'", encoded_params);
519                                            }
520                                        } else {
521                                            return Err(Box::new(std::io::Error::new(
522                                                std::io::ErrorKind::InvalidData,
523                                                "Session ID not found in 'conok' message from server",
524                                            )));
525                                        }
526                                    },
527                                    //
528                                    // Notifications from server.
529                                    //
530                                    "conf" | "cons" | "clientip" | "servname" | "prog" | "sync" | "eos" => {
531                                        self.make_log( Level::INFO, &format!("Received notification from server: {}", clean_text) );
532                                        // Don't do anything with these notifications for now.
533                                    },
534                                    "probe" => {
535                                        self.make_log( Level::DEBUG, &format!("Received probe message from server: {}", clean_text ) );
536                                    },
537                                    "reqok" => {
538                                        self.make_log( Level::DEBUG, &format!("Received reqok message from server: '{}'", clean_text ) );
539                                    },
540                                    //
541                                    // Subscription confirmation from server.
542                                    //
543                                    "subok" => {
544                                        self.make_log( Level::INFO, &format!("Subscription confirmed by server: '{}'", clean_text) );
545                                    },
546                                    //
547                                    // Usubscription confirmation from server.
548                                    //
549                                    "unsub" => {
550                                        self.make_log( Level::INFO, &format!("Unsubscription confirmed by server: '{}'", clean_text) );
551                                    },
552                                    //
553                                    // Data updates from server.
554                                    //
555                                    "u" => {
556                                        // Parse arguments from the received message.
557                                        let arguments = parse_arguments(&clean_text);
558                                        //
559                                        // Extract the subscription from the first argument.
560                                        //
561                                        let subscription_index = arguments.get(1).unwrap_or(&"").parse::<usize>().unwrap_or(0);
562                                        let subscription = match get_subscription_by_id(self.get_subscriptions(), subscription_index) {
563                                            Some(subscription) => subscription,
564                                            None => {
565                                                self.make_log( Level::WARN, &format!("Subscription not found for index: {}", subscription_index) );
566                                                continue;
567
568                                            }
569                                        };
570                                        //
571                                        // Extract the item from the second argument.
572                                        //
573                                        let item_index = arguments.get(2).unwrap_or(&"").parse::<usize>().unwrap_or(0);
574                                        let item = match subscription.get_items() {
575                                            Some(items) => items.get(item_index-1),
576                                            None => {
577                                                self.make_log( Level::WARN, &format!("No items found in subscription: {:?}", subscription) );
578                                                continue;
579                                            }
580                                        };
581                                        //
582                                        // Determine if the update is a snapshot or real-time update based on the subscription parameters.
583                                        //
584                                        let is_snapshot = match subscription.get_requested_snapshot() {
585                                            Some(ls_snapshot) => {
586                                                match ls_snapshot {
587                                                    Snapshot::No => false,
588                                                    Snapshot::Yes => {
589                                                        match subscription.get_mode() {
590                                                            SubscriptionMode::Merge => {
591                                                                if arguments.len() == 4 && arguments[3] == "$" {
592                                                                    // EOS notification received
593                                                                    true
594                                                                } else {
595                                                                    // If item doesn't exist in item_updates yet, the first update
596                                                                    // is always a snapshot.
597                                                                    if let Some(item_updates) = subscription_item_updates.get(&(subscription_index)) {
598                                                                        if item_updates.get(&(item_index)).is_some() {
599                                                                            // Item update already exists in item_updates, so it's not a snapshot.
600                                                                            false
601                                                                        } else {
602                                                                            // Item update doesn't exist in item_updates, so the first update is always a snapshot.
603                                                                            true
604                                                                        }
605                                                                    } else {
606                                                                        // Item updates not found for subscription, so the first update is always a snapshot.
607                                                                        true
608                                                                    }
609                                                                }
610                                                            },
611                                                            SubscriptionMode::Distinct | SubscriptionMode::Command => {
612                                                                !subscription.is_subscribed()
613                                                            },
614                                                            _ => false,
615                                                        }
616                                                    },
617                                                    _ => false,
618                                                }
619                                            },
620                                            None => false,
621                                        };
622
623                                        // Extract the field values from the third argument.
624                                        let field_values: Vec<&str> = arguments.get(3).unwrap_or(&"").split('|').collect();
625
626                                        //
627                                        // Get fields from subscription and create a HashMap of field names and values.
628                                        //
629                                        let subscription_fields = subscription.get_fields();
630                                        let mut field_map: HashMap<String, Option<String>> = subscription_fields
631                                            .map(|fields| fields.iter().map(|field_name| (field_name.to_string(), None)).collect())
632                                            .unwrap_or_default();
633
634                                        let mut field_index = 0;
635                                        for value in field_values {
636                                            match value {
637                                                "" => {
638                                                    // An empty value means the field is unchanged compared to the previous update of the same field.
639                                                    if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
640                                                        field_map.insert(field_name.to_string(), None);
641                                                    }
642                                                    field_index += 1;
643                                                }
644                                                "#" | "$" => {
645                                                    // A value corresponding to a hash sign "#" or dollar sign "$" means the field is null or empty.
646                                                    if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
647                                                        field_map.insert(field_name.to_string(), Some("".to_string()));
648                                                    }
649                                                    field_index += 1;
650                                                }
651                                                value if value.starts_with('^') => {
652                                                    let command = value.chars().nth(1).unwrap_or(' ');
653                                                    match command {
654                                                        '0'..='9' => {
655                                                            let count = value[1..].parse().unwrap_or(0);
656                                                            for i in 0..count {
657                                                                if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index + i)) {
658                                                                    field_map.insert(field_name.to_string(), None);
659                                                                }
660                                                            }
661                                                            field_index += count;
662                                                        }
663                                                        'P' | 'T' => {
664                                                            let diff_value = serde_urlencoded::from_str(&value[2..]).unwrap_or_else(|_| value[2..].to_string());
665                                                            if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index))
666                                                                && let Some(prev_value) = field_map.get(field_name).and_then(|v| v.as_ref()) {
667                                                                    let new_value = match command {
668                                                                        'P' => {
669                                                                            // Apply JSON Patch
670                                                                            let patch: serde_json::Value = serde_json::from_str(&diff_value).unwrap_or(serde_json::Value::Null);
671                                                                            let mut prev_json: serde_json::Value = serde_json::from_str(prev_value).unwrap_or(serde_json::Value::Null);
672                                                                            let patch_operations: Vec<json_patch::PatchOperation> = serde_json::from_value(patch).unwrap_or_default();
673                                                                            let _ = json_patch::patch(&mut prev_json, &patch_operations);
674                                                                            prev_json.to_string()
675                                                                        }
676                                                                        'T' => {
677                                                                            // Apply TLCP-diff
678                                                                            //tlcp_diff::apply_diff(prev_value, &diff_value).unwrap_or_else(|_| prev_value.to_string())
679                                                                            unimplemented!("Implement TLCP-diff");
680                                                                        }
681                                                                        _ => unreachable!(),
682                                                                    };
683                                                                    field_map.insert(field_name.to_string(), Some(new_value.to_string()));
684                                                                }
685                                                            field_index += 1;
686                                                        }
687                                                        _ => {
688                                                            let decoded_value = serde_urlencoded::from_str(value).unwrap_or_else(|_| value.to_string());
689                                                            if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
690                                                                field_map.insert(field_name.to_string(), Some(decoded_value));
691                                                            }
692                                                            field_index += 1;
693                                                        }
694                                                    }
695                                                }
696                                                value if value.starts_with('{') => {
697                                                    // in this case it is a json payload that we will let the consumer handle. In this case, it is important
698                                                    // to preserve casing for parsing.
699                                                    let original_json = parse_arguments(submessage).get(3).unwrap_or(&"").split('|').collect::<Vec<&str>>();
700                                                    let mut payload = "";
701                                                    for json in original_json.iter()
702                                                    {
703                                                        if json.is_empty() || *json == "#"
704                                                        {
705                                                            continue;
706                                                        }
707
708                                                        payload = json;
709                                                    }
710
711                                                    if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
712                                                        field_map.insert(field_name.to_string(), Some(payload.to_string()));
713                                                    }
714                                                    field_index += 1;
715                                                }
716                                                _ => {
717                                                    let decoded_value = serde_urlencoded::from_str(value).unwrap_or_else(|_| value.to_string());
718                                                    if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
719                                                        field_map.insert(field_name.to_string(), Some(decoded_value));
720                                                    }
721                                                    field_index += 1;
722                                                }
723                                            }
724                                        }
725
726                                        // Store only item_update's changed fields.
727                                        let changed_fields: HashMap<String, String> = field_map.iter()
728                                            .filter_map(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone())))
729                                            .collect();
730
731                                        //
732                                        // Take the proper item_update from item_updates and update it with changed fields.
733                                        // If the item_update doesn't exist yet, create a new one.
734                                        //
735                                        let current_item_update: ItemUpdate;
736                                        match subscription_item_updates.get_mut(&(subscription_index)) {
737                                            Some(item_updates) => match item_updates.get_mut(&(item_index)) {
738                                                Some(item_update) => {
739                                                    //
740                                                    // Iterate changed_fields and update existing item_update.fields assigning the new values.
741                                                    //
742                                                    for (field_name, new_value) in &changed_fields {
743                                                        if item_update.fields.contains_key(field_name) {
744                                                            item_update.fields.insert((*field_name).clone(), Some(new_value.clone()));
745                                                        }
746                                                    }
747                                                    item_update.changed_fields = changed_fields.clone();
748                                                    item_update.is_snapshot = is_snapshot;
749                                                    current_item_update = item_update.clone();
750                                                },
751                                                None => {
752                                                    // Create a new item_update and add it to item_updates.
753                                                    let item_update = ItemUpdate {
754                                                        item_name: item.cloned(),
755                                                        item_pos: item_index,
756                                                        fields: field_map.clone(),
757                                                        changed_fields: changed_fields.clone(),
758                                                        is_snapshot,
759                                                    };
760                                                    current_item_update = item_update.clone();
761                                                    item_updates.insert(item_index, item_update);
762                                                }
763                                            },
764                                            None => {
765                                                // Create a new item_update and add it to item_updates.
766                                                let item_update = ItemUpdate {
767                                                    item_name: item.cloned(),
768                                                    item_pos: item_index,
769                                                    fields: field_map,
770                                                    changed_fields,
771                                                    is_snapshot,
772                                                };
773                                                current_item_update = item_update.clone();
774                                                let mut item_updates = HashMap::new();
775                                                item_updates.insert(item_index, item_update);
776                                                subscription_item_updates.insert(subscription_index, item_updates);
777                                            }
778                                        };
779
780                                        // Get mutable subscription listeners directly.
781                                        let subscription_listeners = subscription.get_listeners();
782
783                                        // Iterate subscription listeners and call on_item_update for each listener.
784                                        for listener in subscription_listeners {
785                                            listener.on_item_update(&current_item_update);
786                                        }
787                                    }
788                                    //
789                                    // Connection confirmation from server.
790                                    //
791                                    "wsok" => {
792                                        self.make_log( Level::INFO, &format!("Connection confirmed by server: '{}'", clean_text) );
793                                        //
794                                        // Request session creation.
795                                        //
796                                        let ls_adapter_set = match self.connection_details.get_adapter_set() {
797                                            Some(adapter_set) => adapter_set,
798                                            None => {
799                                                return Err(Box::new(IllegalStateException::new(
800                                                    "No adapter set found in connection details.",
801                                                )));
802                                            },
803                                        };
804                                        let ls_send_sync = self.connection_options.get_send_sync().to_string();
805                                        let mut params: Vec<(&str, &str)> = vec![
806                                            ("LS_adapter_set", ls_adapter_set),
807                                            ("LS_cid", "mgQkwtwdysogQz2BJ4Ji kOj2Bg"),
808                                            ("LS_send_sync", &ls_send_sync),
809                                        ];
810                                        if let Some(user) = &self.connection_details.get_user() {
811                                            params.push(("LS_user", user));
812                                        }
813                                        if let Some(password) = &self.connection_details.get_password() {
814                                            params.push(("LS_password", password));
815                                        }
816                                        params.push(("LS_protocol", Self::TLCP_VERSION));
817                                        let encoded_params = serde_urlencoded::to_string(&params)?;
818                                        write_stream
819                                            .send(Message::Text(format!("create_session\r\n{}\n", encoded_params).into()))
820                                            .await?;
821                                        self.make_log( Level::DEBUG, &format!("Sent create session request: '{}'", encoded_params) );
822                                    },
823                                    unexpected_message => {
824                                        return Err(Box::new(std::io::Error::new(
825                                            std::io::ErrorKind::InvalidData,
826                                            format!(
827                                                "Unexpected message received from server: '{:?}'",
828                                                unexpected_message
829                                            ),
830                                        )));
831                                    },
832                                }
833                            }
834                        },
835                        Some(Ok(non_text_message)) => {
836                            return Err(Box::new(std::io::Error::new(
837                                std::io::ErrorKind::InvalidData,
838                                format!(
839                                    "Unexpected non-text message from server: {:?}",
840                                    non_text_message
841                                ),
842                            )));
843                        },
844                        Some(Err(err)) => {
845                            return Err(Box::new(std::io::Error::new(
846                                std::io::ErrorKind::InvalidData,
847                                format!("Error reading message from server: {}", err),
848                            )));
849                        },
850                        None => {
851                            self.make_log( Level::DEBUG, "No more messages from server" );
852                            break;
853                        },
854                    }
855                },
856                Some(subscription_request) = self.subscription_receiver.recv() => {
857                    request_id += 1;
858                    // Process subscription requests.
859                    if subscription_request.subscription.is_some()
860                    {
861                        self.subscriptions.push(subscription_request.subscription.unwrap());
862
863                        // if we are not connected yet, we will subscribe later
864                        if !is_connected {
865                            continue;
866                        }
867
868                        subscription_id += 1;
869                        self.subscriptions.last_mut().unwrap().id = subscription_id;
870                        self.subscriptions.last().unwrap().id_sender.try_send(subscription_id)?;
871
872                        let encoded_params = match Self::get_subscription_params(self.subscriptions.last().unwrap(), request_id)
873                        {
874                            Ok(params) => params,
875                            Err(err) => {
876                                return Err(err);
877                            },
878                        };
879
880                        write_stream
881                            .send(Message::Text(format!("control\r\n{}", encoded_params).into()))
882                            .await?;
883
884                        self.make_log( Level::INFO, &format!("Sent subscription request: '{}'", encoded_params) );
885                    }
886                    // Process unsubscription requests.
887                    else if subscription_request.subscription_id.is_some()
888                    {
889                        let unsubscription_id = subscription_request.subscription_id.unwrap();
890                        let encoded_params = match Self::get_unsubscription_params(unsubscription_id, request_id)
891                        {
892                            Ok(params) => params,
893                            Err(err) => {
894                                return Err(err);
895                            },
896                        };
897
898                        write_stream
899                            .send(Message::Text(format!("control\r\n{}", encoded_params).into()))
900                            .await?;
901
902                        self.make_log( Level::INFO, &format!("Sent unsubscription request: '{}'", encoded_params) );
903
904                        self.subscriptions.retain(|s| s.id != unsubscription_id);
905
906                        if self.subscriptions.is_empty()
907                        {
908                            self.make_log( Level::INFO, "No more subscriptions, disconnecting" );
909                            shutdown_signal.notify_one();
910                        }
911                    }
912                },
913                _ = shutdown_signal.notified() => {
914                    self.make_log( Level::INFO, "Received shutdown signal" );
915                    break;
916                },
917            }
918        }
919
920        Ok(())
921    }
922
923    /// Operation method that requests to close the Session opened against the configured Lightstreamer
924    /// Server (if any).
925    ///
926    /// When `disconnect()` is called, the "Stream-Sense" mechanism is stopped.
927    ///
928    /// Note that active `Subscription` instances, associated with this `LightstreamerClient` instance,
929    /// are preserved to be re-subscribed to on future Sessions.
930    ///
931    /// Note that the request to disconnect is accomplished by the client in a separate thread; this
932    /// means that an invocation to `getStatus()` right after `disconnect()` might not reflect the
933    /// change yet.
934    ///
935    /// When the request to disconnect is finally being executed, if the status of the client is
936    /// "DISCONNECTED", then nothing will be done.
937    ///
938    /// See also `connect()`
939    #[instrument(level = "trace")]
940    pub async fn disconnect(&mut self) {
941        self.make_log(Level::INFO, "Disconnecting from Lightstreamer server");
942
943        // If auto-reconnect is enabled and we have a connection manager, stop it
944        if self.auto_reconnect_enabled {
945            if let Some(ref manager) = self.connection_manager {
946                manager.shutdown().await;
947            }
948            self.connection_manager = None;
949        }
950
951        // Update status to disconnected
952        self.status = ClientStatus::Disconnected(DisconnectionType::WillRetry);
953
954        // Notify listeners about status change
955        for listener in &self.listeners {
956            listener.on_status_change(&self.status.to_string());
957        }
958    }
959
960    /// Static inquiry method that can be used to share cookies between connections to the Server
961    /// (performed by this library) and connections to other sites that are performed by the application.
962    /// With this method, cookies received from the Server can be extracted for sending through other
963    /// connections, according with the URI to be accessed.
964    ///
965    /// See `addCookies()` for clarifications on when cookies are directly stored by the library and
966    /// when not.
967    ///
968    /// # Parameters
969    ///
970    /// * `uri`: the URI to which the cookies should be sent, or `None`.
971    ///
972    /// # Returns
973    ///
974    /// A list with the various cookies that can be sent in a HTTP request for the specified URI.
975    /// If a `None` URI was supplied, all available non-expired cookies will be returned.
976    pub fn get_cookies(_uri: Option<&str>) -> Cookie<'_> {
977        // Implementation for get_cookies
978        unimplemented!()
979    }
980
981    /// Returns a list containing the `ClientListener` instances that were added to this client.
982    ///
983    /// # Returns
984    ///
985    /// A list containing the listeners that were added to this client.
986    ///
987    /// See also `addListener()`
988    pub fn get_listeners(&self) -> &Vec<Box<dyn ClientListener>> {
989        &self.listeners
990    }
991
992    /// Inquiry method that gets the current client status and transport (when applicable).
993    ///
994    /// # Returns
995    ///
996    /// The current client status. It can be one of the following values:
997    ///
998    /// - `"CONNECTING"`: the client is waiting for a Server's response in order to establish a connection;
999    /// - `"CONNECTED:STREAM-SENSING"`: the client has received a preliminary response from the server
1000    ///   and is currently verifying if a streaming connection is possible;
1001    /// - `"CONNECTED:WS-STREAMING"`: a streaming connection over WebSocket is active;
1002    /// - `"CONNECTED:HTTP-STREAMING"`: a streaming connection over HTTP is active;
1003    /// - `"CONNECTED:WS-POLLING"`: a polling connection over WebSocket is in progress;
1004    /// - `"CONNECTED:HTTP-POLLING"`: a polling connection over HTTP is in progress;
1005    /// - `"STALLED"`: the Server has not been sending data on an active streaming connection for
1006    ///   longer than a configured time;
1007    /// - `"DISCONNECTED:WILL-RETRY"`: no connection is currently active but one will be opened
1008    ///   (possibly after a timeout);
1009    /// - `"DISCONNECTED:TRYING-RECOVERY"`: no connection is currently active, but one will be opened
1010    ///   as soon as possible, as an attempt to recover the current session after a connection issue;
1011    /// - `"DISCONNECTED"`: no connection is currently active.
1012    ///
1013    /// See also `ClientListener.onStatusChange()`
1014    pub fn get_status(&self) -> &ClientStatus {
1015        &self.status
1016    }
1017
1018    /// Inquiry method that returns a list containing all the `Subscription` instances that are
1019    /// currently "active" on this `LightstreamerClient`.
1020    ///
1021    /// Internal second-level `Subscription` are not included.
1022    ///
1023    /// # Returns
1024    ///
1025    /// A list, containing all the `Subscription` currently "active" on this `LightstreamerClient`.
1026    /// The list can be empty.
1027    ///
1028    /// See also `subscribe()`
1029    pub fn get_subscriptions(&self) -> &Vec<Subscription> {
1030        &self.subscriptions
1031    }
1032
1033    /// Creates a new instance of `LightstreamerClient`.
1034    ///
1035    /// The constructor initializes the client with the server address and adapter set, if provided.
1036    /// It sets up the connection details and options for the client. If no server address or
1037    /// adapter set is specified, those properties on the client will be `None`. This allows
1038    /// for late configuration of these details before connecting to the Lightstreamer server.
1039    ///
1040    /// # Arguments
1041    /// * `server_address` - An optional reference to a string slice that represents the server
1042    ///   address to connect to. If `None`, the server address must be set later.
1043    /// * `adapter_set` - An optional reference to a string slice that specifies the adapter set name.
1044    ///   If `None`, the adapter set must be set later.
1045    ///
1046    /// # Returns
1047    /// A result containing the new `LightstreamerClient` instance if successful, or an
1048    /// `IllegalStateException` if the initialization fails due to invalid state conditions.
1049    ///
1050    /// # Panics
1051    /// Does not panic under normal circumstances. However, unexpected internal errors during
1052    /// the creation of internal components could cause panics, which should be considered when
1053    /// using this function in production code.
1054    ///
1055    pub fn new(
1056        server_address: Option<&str>,
1057        adapter_set: Option<&str>,
1058        username: Option<&str>,
1059        password: Option<&str>,
1060    ) -> Result<LightstreamerClient, Box<dyn Error>> {
1061        let connection_details =
1062            ConnectionDetails::new(server_address, adapter_set, username, password)?;
1063        let connection_options = ConnectionOptions::default();
1064        let (subscription_sender, subscription_receiver) = channel(100);
1065
1066        Ok(LightstreamerClient {
1067            server_address: server_address.map(|s| s.to_string()),
1068            adapter_set: adapter_set.map(|s| s.to_string()),
1069            connection_details,
1070            connection_options,
1071            listeners: Vec::new(),
1072            subscriptions: Vec::new(),
1073            status: ClientStatus::Disconnected(DisconnectionType::WillRetry),
1074            logging: LogType::StdLogs,
1075            subscription_sender,
1076            subscription_receiver,
1077            connection_manager: None,
1078            reconnection_config: ReconnectionConfig::default(),
1079            heartbeat_config: HeartbeatConfig::default(),
1080            auto_reconnect_enabled: false,
1081        })
1082    }
1083
1084    /// Removes a listener from the `LightstreamerClient` instance so that it will not receive
1085    /// events anymore.
1086    ///
1087    /// A listener can be removed at any time.
1088    ///
1089    /// # Parameters
1090    ///
1091    /// * `listener`: The listener to be removed.
1092    ///
1093    /// See also `addListener()`
1094    pub fn remove_listener(&mut self, _listener: Box<dyn ClientListener>) {
1095        unimplemented!("Implement mechanism to remove listener from LightstreamerClient");
1096        //self.listeners.remove(&listener);
1097    }
1098
1099    /// Operation method that sends a message to the Server. The message is interpreted and handled
1100    /// by the Metadata Adapter associated to the current Session. This operation supports in-order
1101    /// guaranteed message delivery with automatic batching. In other words, messages are guaranteed
1102    /// to arrive exactly once and respecting the original order, whatever is the underlying transport
1103    /// (HTTP or WebSockets). Furthermore, high frequency messages are automatically batched, if
1104    /// necessary, to reduce network round trips.
1105    ///
1106    /// Upon subsequent calls to the method, the sequential management of the involved messages is
1107    /// guaranteed. The ordering is determined by the order in which the calls to `sendMessage` are
1108    /// issued.
1109    ///
1110    /// If a message, for any reason, doesn't reach the Server (this is possible with the HTTP transport),
1111    /// it will be resent; however, this may cause the subsequent messages to be delayed. For this
1112    /// reason, each message can specify a "delayTimeout", which is the longest time the message,
1113    /// after reaching the Server, can be kept waiting if one of more preceding messages haven't
1114    /// been received yet. If the "delayTimeout" expires, these preceding messages will be discarded;
1115    /// any discarded message will be notified to the listener through `ClientMessageListener.onDiscarded()`.
1116    /// Note that, because of the parallel transport of the messages, if a zero or very low timeout
1117    /// is set for a message and the previous message was sent immediately before, it is possible
1118    /// that the latter gets discarded even if no communication issues occur. The Server may also
1119    /// enforce its own timeout on missing messages, to prevent keeping the subsequent messages for
1120    /// long time.
1121    ///
1122    /// Sequence identifiers can also be associated with the messages. In this case, the sequential
1123    /// management is restricted to all subsets of messages with the same sequence identifier associated.
1124    ///
1125    /// Notifications of the operation outcome can be received by supplying a suitable listener. The
1126    /// supplied listener is guaranteed to be eventually invoked; listeners associated with a sequence
1127    /// are guaranteed to be invoked sequentially.
1128    ///
1129    /// The "UNORDERED_MESSAGES" sequence name has a special meaning. For such a sequence, immediate
1130    /// processing is guaranteed, while strict ordering and even sequentialization of the processing
1131    /// is not enforced. Likewise, strict ordering of the notifications is not enforced. However,
1132    /// messages that, for any reason, should fail to reach the Server whereas subsequent messages
1133    /// had succeeded, might still be discarded after a server-side timeout, in order to ensure that
1134    /// the listener eventually gets a notification.
1135    ///
1136    /// Moreover, if "UNORDERED_MESSAGES" is used and no listener is supplied, a "fire and forget"
1137    /// scenario is assumed. In this case, no checks on missing, duplicated or overtaken messages
1138    /// are performed at all, so as to optimize the processing and allow the highest possible throughput.
1139    ///
1140    /// Since a message is handled by the Metadata Adapter associated to the current connection, a
1141    /// message can be sent only if a connection is currently active. If the special `enqueueWhileDisconnected`
1142    /// flag is specified it is possible to call the method at any time and the client will take
1143    /// care of sending the message as soon as a connection is available, otherwise, if the current
1144    /// status is "DISCONNECTED*", the message will be abandoned and the `ClientMessageListener.onAbort()`
1145    /// event will be fired.
1146    ///
1147    /// Note that, in any case, as soon as the status switches again to "DISCONNECTED*", any message
1148    /// still pending is aborted, including messages that were queued with the `enqueueWhileDisconnected`
1149    /// flag set to `true`.
1150    ///
1151    /// Also note that forwarding of the message to the server is made in a separate thread, hence,
1152    /// if a message is sent while the connection is active, it could be aborted because of a subsequent
1153    /// disconnection. In the same way a message sent while the connection is not active might be
1154    /// sent because of a subsequent connection.
1155    ///
1156    /// # Parameters
1157    ///
1158    /// * `message`: a text message, whose interpretation is entirely demanded to the Metadata Adapter
1159    ///   associated to the current connection.
1160    /// * `sequence`: an alphanumeric identifier, used to identify a subset of messages to be managed
1161    ///   in sequence; underscore characters are also allowed. If the "UNORDERED_MESSAGES" identifier
1162    ///   is supplied, the message will be processed in the special way described above. The parameter
1163    ///   is optional; if set to `None`, "UNORDERED_MESSAGES" is used as the sequence name.
1164    /// * `delay_timeout`: a timeout, expressed in milliseconds. If higher than the Server configured
1165    ///   timeout on missing messages, the latter will be used instead. The parameter is optional; if
1166    ///   a negative value is supplied, the Server configured timeout on missing messages will be applied.
1167    ///   This timeout is ignored for the special "UNORDERED_MESSAGES" sequence, although a server-side
1168    ///   timeout on missing messages still applies.
1169    /// * `listener`: an object suitable for receiving notifications about the processing outcome. The
1170    ///   parameter is optional; if not supplied, no notification will be available.
1171    /// * `enqueue_while_disconnected`: if this flag is set to `true`, and the client is in a disconnected
1172    ///   status when the provided message is handled, then the message is not aborted right away but
1173    ///   is queued waiting for a new session. Note that the message can still be aborted later when
1174    ///   a new session is established.
1175    pub fn send_message(
1176        &mut self,
1177        message: &str,
1178        sequence: Option<&str>,
1179        _delay_timeout: Option<u64>,
1180        listener: Option<Box<dyn ClientMessageListener>>,
1181        enqueue_while_disconnected: bool,
1182    ) {
1183        let _sequence = sequence.unwrap_or("UNORDERED_MESSAGES");
1184
1185        // Handle the message based on the current connection status
1186        match &self.status {
1187            ClientStatus::Connected(_connection_type) => {
1188                // Send the message to the server in a separate thread
1189                // ...
1190            }
1191            ClientStatus::Disconnected(_disconnection_type) => {
1192                if enqueue_while_disconnected {
1193                    // Enqueue the message to be sent when a connection is available
1194                    // ...
1195                } else {
1196                    // Abort the message and notify the listener
1197                    if let Some(listener) = listener {
1198                        listener.on_abort(message, false);
1199                    }
1200                }
1201            }
1202            _ => {
1203                // Enqueue the message to be sent when a connection is available
1204                // ...
1205            }
1206        }
1207        unimplemented!("Complete mechanism to send message to LightstreamerClient.");
1208    }
1209
1210    /// Static method that permits to configure the logging system used by the library. The logging
1211    /// system must respect the `LoggerProvider` interface. A custom class can be used to wrap any
1212    /// third-party logging system.
1213    ///
1214    /// If no logging system is specified, all the generated log is discarded.
1215    ///
1216    /// The following categories are available to be consumed:
1217    ///
1218    /// - `lightstreamer.stream`: logs socket activity on Lightstreamer Server connections; at INFO
1219    ///   level, socket operations are logged; at DEBUG level, read/write data exchange is logged.
1220    /// - `lightstreamer.protocol`: logs requests to Lightstreamer Server and Server answers; at INFO
1221    ///   level, requests are logged; at DEBUG level, request details and events from the Server are logged.
1222    /// - `lightstreamer.session`: logs Server Session lifecycle events; at INFO level, lifecycle events
1223    ///   are logged; at DEBUG level, lifecycle event details are logged.
1224    /// - `lightstreamer.subscriptions`: logs subscription requests received by the clients and the related
1225    ///   updates; at WARN level, alert events from the Server are logged; at INFO level, subscriptions
1226    ///   and unsubscriptions are logged; at DEBUG level, requests batching and update details are logged.
1227    /// - `lightstreamer.actions`: logs settings / API calls.
1228    ///
1229    /// # Parameters
1230    ///
1231    /// * `provider`: A `LoggerProvider` instance that will be used to generate log messages by the
1232    ///   library classes.
1233    pub fn set_logger_provider() {
1234        unimplemented!("Implement mechanism to set logger provider for LightstreamerClient.");
1235    }
1236    /*
1237    pub fn set_logger_provider(provider: LoggerProvider) {
1238        // Implementation for set_logger_provider
1239    }
1240    */
1241
1242    /// Provides a mean to control the way TLS certificates are evaluated, with the possibility to
1243    /// accept untrusted ones.
1244    ///
1245    /// May be called only once before creating any `LightstreamerClient` instance.
1246    ///
1247    /// # Parameters
1248    ///
1249    /// * `factory`: an instance of `ssl.SSLContext`
1250    ///
1251    /// # Raises
1252    ///
1253    /// * `IllegalArgumentException`: if the factory is `None`
1254    /// * `IllegalStateException`: if a factory is already installed
1255    pub fn set_trust_manager_factory() {
1256        unimplemented!("Implement mechanism to set trust manager factory for LightstreamerClient.");
1257    }
1258    /*
1259    pub fn set_trust_manager_factory(factory: Option<SslContext>) -> Result<(), IllegalArgumentException> {
1260        if factory.is_none() {
1261            return Err(IllegalArgumentException::new(
1262                "Factory cannot be None",
1263            ));
1264        }
1265
1266        // Implementation for set_trust_manager_factory
1267        Ok(())
1268    }
1269    */
1270
1271    /// Adds a subscription to the `LightstreamerClient` instance.
1272    ///
1273    /// Active subscriptions are subscribed to through the server as soon as possible (i.e. as soon
1274    /// as there is a session available). Active `Subscription` are automatically persisted across different
1275    /// sessions as long as a related unsubscribe call is not issued.
1276    ///
1277    /// Subscriptions can be given to the `LightstreamerClient` at any time. Once done the `Subscription`
1278    /// immediately enters the "active" state.
1279    ///
1280    /// Once "active", a `Subscription` instance cannot be provided again to a `LightstreamerClient`
1281    /// unless it is first removed from the "active" state through a call to `unsubscribe()`.
1282    ///
1283    /// Also note that forwarding of the subscription to the server is made in a separate thread.
1284    ///
1285    /// A successful subscription to the server will be notified through a `SubscriptionListener.onSubscription()`
1286    /// event.
1287    ///
1288    /// # Parameters
1289    ///
1290    /// * `subsrciption_sender`: A `Sender` object that sends a `SubscriptionRequest` to the `LightstreamerClient`
1291    /// * `subscription`: A `Subscription` object, carrying all the information needed to process real-time
1292    ///   values.
1293    ///
1294    /// See also `unsubscribe()`
1295    pub async fn subscribe(
1296        subscription_sender: Sender<SubscriptionRequest>,
1297        subscription: Subscription,
1298    ) {
1299        subscription_sender
1300            .send(SubscriptionRequest {
1301                subscription: Some(subscription),
1302                subscription_id: None,
1303            })
1304            .await
1305            .unwrap()
1306    }
1307
1308    /// If you want to be able to unsubscribe from a subscription, you need to keep track of the id
1309    /// of the subscription. This blocking method allows you to wait for the id of the subscription
1310    /// to be returned.
1311    ///
1312    /// # Parameters
1313    ///
1314    /// * `subscription_sender`: A `Sender` object that sends a `SubscriptionRequest` to the `LightstreamerClient`
1315    /// * `subscription`: A `Subscription` object, carrying all the information needed to process real-time
1316    ///
1317    pub async fn subscribe_get_id(
1318        subscription_sender: Sender<SubscriptionRequest>,
1319        mut subscription: Subscription,
1320    ) -> Result<usize, Box<dyn Error + Send + Sync>> {
1321        // Extract the id_receiver before sending the subscription
1322        let mut id_receiver = subscription.id_receiver;
1323
1324        // Create a new channel for the subscription we're about to send
1325        let (_new_sender, new_receiver) = channel(1);
1326        subscription.id_receiver = new_receiver;
1327
1328        // Send the subscription
1329        LightstreamerClient::subscribe(subscription_sender, subscription).await;
1330
1331        // Wait for the ID to be updated through the channel
1332        match id_receiver.recv().await {
1333            Some(id) => Ok(id),
1334            None => Err(Box::new(IllegalStateException::new(
1335                "Failed to get subscription id",
1336            ))),
1337        }
1338    }
1339
1340    /*
1341
1342        pub async fn subscribe_get_id(
1343        subscription_sender: Sender<SubscriptionRequest>,
1344        subscription: Subscription,
1345    ) -> Result<usize, Box<dyn Error + Send + Sync>> {
1346        let mut id_receiver = subscription.id_receiver.clone();
1347        LightstreamerClient::subscribe(subscription_sender.clone(), subscription);
1348
1349        match id_receiver.changed().await {
1350            Ok(_) => Ok(*id_receiver.borrow()),
1351            Err(_) => Err(Box::new(IllegalStateException::new(
1352                "Failed to get subscription id",
1353            ))),
1354        }
1355    }
1356     */
1357
1358    /// Operation method that removes a `Subscription` that is currently in the "active" state.
1359    ///
1360    /// By bringing back a `Subscription` to the "inactive" state, the unsubscription from all its
1361    /// items is requested to Lightstreamer Server.
1362    ///
1363    /// Subscription can be unsubscribed from at any time. Once done the `Subscription` immediately
1364    /// exits the "active" state.
1365    ///
1366    /// Note that forwarding of the unsubscription to the server is made in a separate thread.
1367    ///
1368    /// The unsubscription will be notified through a `SubscriptionListener.onUnsubscription()` event.
1369    ///
1370    /// # Parameters
1371    ///
1372    /// * `subsrciption_sender`: A `Sender` object that sends a `SubscriptionRequest` to the `LightstreamerClient`
1373    /// * `subscription_id`: The id of the subscription to be unsubscribed from.
1374    ///   instance.
1375    pub async fn unsubscribe(
1376        subscription_sender: Sender<SubscriptionRequest>,
1377        subscription_id: usize,
1378    ) {
1379        subscription_sender
1380            .send(SubscriptionRequest {
1381                subscription: None,
1382                subscription_id: Some(subscription_id),
1383            })
1384            .await
1385            .unwrap()
1386    }
1387
1388    /// Method setting enum for the logging of this instance.
1389    ///
1390    /// Default logging type is StdLogs, corresponding to `stdout`
1391    ///
1392    /// `LightstreamerClient` has methods for logging that are compatible with the `Tracing` crate.
1393    /// Enabling logging for the `Tracing` crate requires implementation of a tracing subscriber
1394    /// and its configuration and formatting.
1395    ///
1396    /// # Parameters
1397    ///
1398    /// * `logging`: An enum declaring the logging type of this `LightstreamerClient` instance.
1399    pub fn set_logging_type(&mut self, logging: LogType) {
1400        self.logging = logging;
1401    }
1402
1403    /// Method for logging messages
1404    ///
1405    /// Match case wraps log types. `loglevel` param ignored in StdLogs case, all output to stdout.
1406    ///
1407    /// # Parameters
1408    ///
1409    /// * `loglevel` Enum determining use of stdout or Tracing subscriber.
1410    pub fn make_log(&mut self, loglevel: Level, log: &str) {
1411        match self.logging {
1412            LogType::StdLogs => {
1413                debug!("{}", log);
1414            }
1415            LogType::TracingLogs => match loglevel {
1416                Level::INFO => {
1417                    info!(log);
1418                }
1419                Level::WARN => {
1420                    warn!(log);
1421                }
1422                Level::ERROR => {
1423                    error!(log);
1424                }
1425                Level::TRACE => {
1426                    trace!(log);
1427                }
1428                Level::DEBUG => {
1429                    debug!(log);
1430                }
1431            },
1432        }
1433    }
1434
1435    /// Enables automatic reconnection with default configuration.
1436    pub fn enable_auto_reconnect(&mut self) {
1437        self.auto_reconnect_enabled = true;
1438        // ConnectionManager will be created during connect() to avoid circular dependency
1439    }
1440
1441    /// Enables automatic reconnection with the specified configuration.
1442    ///
1443    /// # Parameters
1444    ///
1445    /// * `reconnection_config`: Configuration for reconnection behavior
1446    /// * `heartbeat_config`: Configuration for heartbeat monitoring
1447    pub fn enable_auto_reconnect_with_config(
1448        &mut self,
1449        reconnection_config: ReconnectionConfig,
1450        heartbeat_config: HeartbeatConfig,
1451    ) -> Result<(), Box<dyn Error>> {
1452        self.reconnection_config = reconnection_config;
1453        self.heartbeat_config = heartbeat_config;
1454        self.auto_reconnect_enabled = true;
1455        // ConnectionManager will be created during connect() to avoid circular dependency
1456        Ok(())
1457    }
1458
1459    /// Disables automatic reconnection.
1460    pub async fn disable_auto_reconnect(&mut self) {
1461        self.auto_reconnect_enabled = false;
1462        if let Some(manager) = &self.connection_manager {
1463            manager.shutdown().await;
1464        }
1465        self.connection_manager = None;
1466    }
1467
1468    /// Returns whether automatic reconnection is currently enabled.
1469    pub fn is_auto_reconnect_enabled(&self) -> bool {
1470        self.auto_reconnect_enabled
1471    }
1472
1473    /// Gets the current reconnection configuration.
1474    pub fn get_reconnection_config(&self) -> &ReconnectionConfig {
1475        &self.reconnection_config
1476    }
1477
1478    /// Gets the current heartbeat configuration.
1479    pub fn get_heartbeat_config(&self) -> &HeartbeatConfig {
1480        &self.heartbeat_config
1481    }
1482
1483    /// Updates the reconnection configuration.
1484    pub fn set_reconnection_config(&mut self, config: ReconnectionConfig) {
1485        self.reconnection_config = config;
1486        // Note: ConnectionManager doesn't support runtime config updates
1487        // The configuration is set during ConnectionManager creation
1488    }
1489
1490    /// Updates the heartbeat configuration.
1491    pub fn set_heartbeat_config(&mut self, config: HeartbeatConfig) {
1492        self.heartbeat_config = config;
1493        // Note: ConnectionManager doesn't support runtime config updates
1494        // The configuration is set during ConnectionManager creation
1495    }
1496
1497    /// Gets the current connection state from the connection manager.
1498    pub async fn get_connection_state(&self) -> ConnectionState {
1499        if let Some(manager) = &self.connection_manager {
1500            manager.get_connection_state().await
1501        } else {
1502            ConnectionState::Disconnected
1503        }
1504    }
1505
1506    /// Gets connection metrics from the connection manager.
1507    pub async fn get_connection_metrics(&self) -> crate::connection::management::ConnectionMetrics {
1508        if let Some(manager) = &self.connection_manager {
1509            manager.get_metrics().await
1510        } else {
1511            crate::connection::management::ConnectionMetrics::default()
1512        }
1513    }
1514
1515    /// Forces an immediate reconnection attempt if auto-reconnect is enabled.
1516    pub async fn force_reconnect(&mut self) -> Result<(), Box<dyn Error>> {
1517        if !self.auto_reconnect_enabled {
1518            return Err("Auto-reconnect is not enabled".into());
1519        }
1520
1521        if let Some(manager) = &mut self.connection_manager {
1522            manager.force_reconnect().await?;
1523        }
1524
1525        Ok(())
1526    }
1527}
1528
1529#[cfg(test)]
1530mod tests {
1531    use super::*;
1532    use crate::subscription::{Subscription, SubscriptionListener, SubscriptionMode};
1533    use std::error::Error;
1534    use std::fmt::Debug;
1535    use std::sync::{Arc, Mutex};
1536    use tokio::sync::Notify;
1537
1538    #[derive(Debug)]
1539    struct MockClientListener {
1540        property_changes: Arc<Mutex<Vec<String>>>,
1541        status_changes: Arc<Mutex<Vec<String>>>,
1542        server_errors: Arc<Mutex<Vec<(i32, String)>>>,
1543    }
1544
1545    impl MockClientListener {
1546        fn new() -> Self {
1547            MockClientListener {
1548                property_changes: Arc::new(Mutex::new(Vec::new())),
1549                status_changes: Arc::new(Mutex::new(Vec::new())),
1550                server_errors: Arc::new(Mutex::new(Vec::new())),
1551            }
1552        }
1553
1554        #[allow(dead_code)]
1555        fn with_shared_data(
1556            property_changes: Arc<Mutex<Vec<String>>>,
1557            status_changes: Arc<Mutex<Vec<String>>>,
1558            server_errors: Arc<Mutex<Vec<(i32, String)>>>,
1559        ) -> Self {
1560            MockClientListener {
1561                property_changes,
1562                status_changes,
1563                server_errors,
1564            }
1565        }
1566    }
1567
1568    impl ClientListener for MockClientListener {
1569        fn on_property_change(&self, property: &str) {
1570            self.property_changes
1571                .lock()
1572                .unwrap()
1573                .push(property.to_string());
1574        }
1575
1576        fn on_status_change(&self, status: &str) {
1577            self.status_changes.lock().unwrap().push(status.to_string());
1578        }
1579
1580        fn on_server_error(&self, code: i32, message: &str) {
1581            self.server_errors
1582                .lock()
1583                .unwrap()
1584                .push((code, message.to_string()));
1585        }
1586    }
1587
1588    #[allow(dead_code)]
1589    struct MockSubscriptionListener;
1590
1591    impl SubscriptionListener for MockSubscriptionListener {
1592        fn on_subscription(&mut self) {}
1593        fn on_unsubscription(&mut self) {}
1594        fn on_item_update(&self, _update: &ItemUpdate) {}
1595    }
1596
1597    impl Debug for MockSubscriptionListener {
1598        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1599            write!(f, "MockSubscriptionListener")
1600        }
1601    }
1602
1603    #[allow(dead_code)]
1604    struct LightstreamerClientTestContext {
1605        client: LightstreamerClient,
1606        property_changes: Arc<Mutex<Vec<String>>>,
1607        status_changes: Arc<Mutex<Vec<String>>>,
1608        server_errors: Arc<Mutex<Vec<(i32, String)>>>,
1609    }
1610
1611    impl LightstreamerClientTestContext {
1612        #[allow(dead_code)]
1613        fn new() -> Result<Self, Box<dyn Error>> {
1614            let property_changes = Arc::new(Mutex::new(Vec::new()));
1615            let status_changes = Arc::new(Mutex::new(Vec::new()));
1616            let server_errors = Arc::new(Mutex::new(Vec::new()));
1617            let listener = MockClientListener::with_shared_data(
1618                Arc::clone(&property_changes),
1619                Arc::clone(&status_changes),
1620                Arc::clone(&server_errors),
1621            );
1622
1623            let mut client = LightstreamerClient::new(
1624                Some("http://test.lightstreamer.com"),
1625                Some("DEMO"),
1626                None,
1627                None,
1628            )?;
1629            client.add_listener(Box::new(listener));
1630
1631            Ok(LightstreamerClientTestContext {
1632                client,
1633                property_changes,
1634                status_changes,
1635                server_errors,
1636            })
1637        }
1638    }
1639
1640    #[test]
1641    fn test_new_lightstreamer_client() {
1642        let result = LightstreamerClient::new(
1643            Some("http://test.lightstreamer.com"),
1644            Some("DEMO"),
1645            None,
1646            None,
1647        );
1648        assert!(result.is_ok());
1649        let client = result.unwrap();
1650        assert_eq!(
1651            client.server_address,
1652            Some("http://test.lightstreamer.com".to_string())
1653        );
1654        assert_eq!(client.adapter_set, Some("DEMO".to_string()));
1655        let result = LightstreamerClient::new(
1656            Some("http://test.lightstreamer.com"),
1657            Some("DEMO"),
1658            Some("user1"),
1659            Some("pass1"),
1660        );
1661        assert!(result.is_ok());
1662        let client = result.unwrap();
1663        assert_eq!(
1664            client.connection_details.get_user(),
1665            Some(&"user1".to_string())
1666        );
1667        assert_eq!(
1668            client.connection_details.get_password(),
1669            Some(&"pass1".to_string())
1670        );
1671        let result = LightstreamerClient::new(Some("invalid-url"), Some("DEMO"), None, None);
1672        assert!(result.is_err());
1673        let result = LightstreamerClient::new(None, Some("DEMO"), None, None);
1674        assert!(result.is_ok());
1675        let client = result.unwrap();
1676        assert_eq!(client.server_address, None);
1677    }
1678
1679    #[test]
1680    fn test_add_listener() {
1681        let result = LightstreamerClient::new(
1682            Some("http://test.lightstreamer.com"),
1683            Some("DEMO"),
1684            None,
1685            None,
1686        );
1687        assert!(result.is_ok());
1688        let mut client = result.unwrap();
1689        assert_eq!(client.listeners.len(), 0);
1690        let listener = Box::new(MockClientListener::new());
1691        client.add_listener(listener);
1692        assert_eq!(client.listeners.len(), 1);
1693        let listener2 = Box::new(MockClientListener::new());
1694        client.add_listener(listener2);
1695        assert_eq!(client.listeners.len(), 2);
1696    }
1697
1698    #[test]
1699    fn test_get_listeners() {
1700        let result = LightstreamerClient::new(
1701            Some("http://test.lightstreamer.com"),
1702            Some("DEMO"),
1703            None,
1704            None,
1705        );
1706        assert!(result.is_ok());
1707        let mut client = result.unwrap();
1708        assert_eq!(client.get_listeners().len(), 0);
1709
1710        let listener = Box::new(MockClientListener::new());
1711        client.add_listener(listener);
1712        assert_eq!(client.get_listeners().len(), 1);
1713    }
1714
1715    #[test]
1716    fn test_get_status() {
1717        let result = LightstreamerClient::new(
1718            Some("http://test.lightstreamer.com"),
1719            Some("DEMO"),
1720            None,
1721            None,
1722        );
1723        assert!(result.is_ok());
1724        let client = result.unwrap();
1725        match client.get_status() {
1726            ClientStatus::Disconnected(DisconnectionType::WillRetry) => {}
1727            _ => panic!("Expected initial status to be DISCONNECTED:WILL-RETRY"),
1728        }
1729    }
1730
1731    #[test]
1732    fn test_get_subscriptions() {
1733        let result = LightstreamerClient::new(
1734            Some("http://test.lightstreamer.com"),
1735            Some("DEMO"),
1736            None,
1737            None,
1738        );
1739        assert!(result.is_ok());
1740        let client = result.unwrap();
1741        assert_eq!(client.get_subscriptions().len(), 0);
1742    }
1743
1744    #[tokio::test]
1745    async fn test_connect_with_no_server_address() {
1746        let result = LightstreamerClient::new(None, Some("DEMO"), None, None);
1747        assert!(result.is_ok());
1748        let client = result.unwrap();
1749        let client_arc = Arc::new(tokio::sync::Mutex::new(client));
1750        let shutdown_signal = Arc::new(Notify::new());
1751        let result = LightstreamerClient::connect(client_arc, shutdown_signal).await;
1752        assert!(result.is_err());
1753    }
1754
1755    #[tokio::test]
1756    async fn test_forced_transport_validation() {
1757        let result = LightstreamerClient::new(
1758            Some("http://test.lightstreamer.com"),
1759            Some("DEMO"),
1760            None,
1761            None,
1762        );
1763        assert!(result.is_ok());
1764        let mut client = result.unwrap();
1765
1766        client.connection_options.set_forced_transport(None);
1767        let client_arc = Arc::new(tokio::sync::Mutex::new(client));
1768        let shutdown_signal = Arc::new(Notify::new());
1769        let result = LightstreamerClient::connect(client_arc.clone(), shutdown_signal).await;
1770        assert!(result.is_err());
1771        client_arc
1772            .lock()
1773            .await
1774            .connection_options
1775            .set_forced_transport(Some(Transport::WsStreaming));
1776    }
1777
1778    #[test]
1779    fn test_subscription_params_generation() {
1780        let subscription = Subscription::new(
1781            SubscriptionMode::Merge,
1782            Some(vec!["item1".to_string(), "item2".to_string()]),
1783            Some(vec!["field1".to_string(), "field2".to_string()]),
1784        )
1785        .unwrap();
1786
1787        let params = LightstreamerClient::get_subscription_params(&subscription, 1);
1788        assert!(params.is_ok());
1789        let params_str = params.unwrap();
1790
1791        assert!(params_str.contains("LS_reqId=1"));
1792        assert!(params_str.contains("LS_op=add"));
1793        assert!(params_str.contains("LS_subId="));
1794        assert!(params_str.contains("LS_mode=MERGE"));
1795        assert!(params_str.contains("LS_group="));
1796        assert!(params_str.contains("LS_schema="));
1797    }
1798
1799    #[test]
1800    fn test_unsubscription_params_generation() {
1801        let params = LightstreamerClient::get_unsubscription_params(42, 123);
1802        assert!(params.is_ok());
1803        let params_str = params.unwrap();
1804
1805        assert!(params_str.contains("LS_reqId=123"));
1806        assert!(params_str.contains("LS_op=delete"));
1807        assert!(params_str.contains("LS_subId=42"));
1808    }
1809
1810    #[test]
1811    fn test_logging_functions() {
1812        let result = LightstreamerClient::new(
1813            Some("http://test.lightstreamer.com"),
1814            Some("DEMO"),
1815            None,
1816            None,
1817        );
1818        assert!(result.is_ok());
1819        let mut client = result.unwrap();
1820
1821        client.set_logging_type(LogType::StdLogs);
1822
1823        client.make_log(Level::INFO, "Test log message");
1824        client.make_log(Level::DEBUG, "Test debug message");
1825        client.set_logging_type(LogType::TracingLogs);
1826        client.make_log(Level::INFO, "Test tracing log message");
1827        client.make_log(Level::DEBUG, "Test tracing debug message");
1828    }
1829
1830    #[test]
1831    fn test_debug_implementation() {
1832        let result = LightstreamerClient::new(
1833            Some("http://test.lightstreamer.com"),
1834            Some("DEMO"),
1835            None,
1836            None,
1837        );
1838        assert!(result.is_ok());
1839        let client = result.unwrap();
1840
1841        // Test that Debug implementation works without panicking
1842        let debug_string = format!("{:?}", client);
1843
1844        // Verify it contains expected fields
1845        assert!(debug_string.contains("server_address"));
1846        assert!(debug_string.contains("adapter_set"));
1847        assert!(debug_string.contains("connection_details"));
1848        assert!(debug_string.contains("connection_options"));
1849        assert!(debug_string.contains("listeners"));
1850        assert!(debug_string.contains("subscriptions"));
1851
1852        // Verify the values are included
1853        assert!(debug_string.contains("http://test.lightstreamer.com"));
1854        assert!(debug_string.contains("DEMO"));
1855    }
1856
1857    #[test]
1858    #[should_panic(expected = "Implement mechanism to add cookies to LightstreamerClient")]
1859    fn test_add_cookies() {
1860        // Test the static method add_cookies
1861        let cookie = Cookie::new("test_cookie", "test_value");
1862        LightstreamerClient::add_cookies("http://test.lightstreamer.com", &cookie);
1863    }
1864
1865    #[test]
1866    #[should_panic(expected = "not implemented")]
1867    fn test_get_cookies() {
1868        // Test the static method get_cookies
1869        LightstreamerClient::get_cookies(Some("http://test.lightstreamer.com"));
1870    }
1871}