lightstreamer_client/
ls_client.rs

1use crate::client_listener::ClientListener;
2use crate::client_message_listener::ClientMessageListener;
3use crate::connection_details::ConnectionDetails;
4use crate::connection_options::ConnectionOptions;
5use crate::error::IllegalStateException;
6use crate::item_update::ItemUpdate;
7use crate::subscription::{Snapshot, Subscription, SubscriptionMode};
8use crate::util::*;
9
10use cookie::Cookie;
11use futures_util::{SinkExt, StreamExt};
12use std::collections::HashMap;
13use std::error::Error;
14use std::fmt::{self, Debug, Formatter};
15use std::sync::Arc;
16use tokio::sync::{
17    mpsc::{self, Receiver, Sender},
18    Notify,
19};
20use tokio_tungstenite::{
21    connect_async,
22    tungstenite::{
23        http::{HeaderName, HeaderValue, Request},
24        Message,
25    },
26};
27use tracing::{debug, error, info, instrument, trace, warn, Level};
28use url::Url;
29
30/// Represents the current status of the `LightstreamerClient`.
31pub enum ClientStatus {
32    Connecting,
33    Connected(ConnectionType),
34    Stalled,
35    Disconnected(DisconnectionType),
36}
37
38pub enum ConnectionType {
39    HttpPolling,
40    HttpStreaming,
41    StreamSensing,
42    WsPolling,
43    WsStreaming,
44}
45
46pub enum DisconnectionType {
47    WillRetry,
48    TryingRecovery,
49}
50
51pub enum LogType {
52    TracingLogs,
53    StdLogs,
54}
55
56pub struct SubscriptionRequest {
57    subscription: Option<Subscription>,
58    subscription_id: Option<usize>,
59}
60
61/// Facade class for the management of the communication to Lightstreamer Server. Used to provide
62/// configuration settings, event handlers, operations for the control of the connection lifecycle,
63/// Subscription handling and to send messages.
64///
65/// An instance of `LightstreamerClient` handles the communication with Lightstreamer Server on a
66/// specified endpoint. Hence, it hosts one "Session"; or, more precisely, a sequence of Sessions,
67/// since any Session may fail and be recovered, or it can be interrupted on purpose. So, normally,
68/// a single instance of `LightstreamerClient` is needed.
69///
70/// However, multiple instances of `LightstreamerClient` can be used, toward the same or multiple
71/// endpoints.
72///
73/// You can listen to the events generated by a session by registering an event listener, such as
74/// `ClientListener` or `SubscriptionListener`. These listeners allow you to handle various events,
75/// such as session creation, connection status, subscription updates, and server messages. However,
76/// you should be aware that the event notifications are dispatched by a single thread, the so-called
77/// event thread. This means that if the operations of a listener are slow or blocking, they will
78/// delay the processing of the other listeners and affect the performance of your application.
79/// Therefore, you should delegate any slow or blocking operations to a dedicated thread, and keep
80/// the listener methods as fast and simple as possible. Note that even if you create multiple
81/// instances of `LightstreamerClient`, they will all use a single event thread, that is shared
82/// among them.
83///
84/// # Parameters
85///
86/// * `server_address`: the address of the Lightstreamer Server to which this `LightstreamerClient`
87///   will connect to. It is possible to specify it later by using `None` here. See
88///   `ConnectionDetails.setServerAddress()` for details.
89/// * `adapter_set`: the name of the Adapter Set mounted on Lightstreamer Server to be used to handle
90///   all requests in the Session associated with this `LightstreamerClient`. It is possible not to
91///   specify it at all or to specify it later by using `None` here. See `ConnectionDetails.setAdapterSet()`
92///   for details.
93///
94/// # Raises
95///
96/// * `IllegalArgumentException`: if a not valid address is passed. See `ConnectionDetails.setServerAddress()`
97///   for details.
98pub struct LightstreamerClient {
99    /// The address of the Lightstreamer Server to which this `LightstreamerClient` will connect.
100    server_address: Option<String>,
101    /// The name of the Adapter Set mounted on Lightstreamer Server to be used to handle all
102    /// requests in the Session associated with this `LightstreamerClient`.
103    adapter_set: Option<String>,
104    /// Data object that contains the details needed to open a connection to a Lightstreamer Server.
105    /// This instance is set up by the `LightstreamerClient` object at its own creation. Properties
106    /// of this object can be overwritten by values received from a Lightstreamer Server.
107    pub connection_details: ConnectionDetails,
108    /// Data object that contains options and policies for the connection to the server. This instance
109    /// is set up by the `LightstreamerClient` object at its own creation. Properties of this object
110    /// can be overwritten by values received from a Lightstreamer Server.
111    pub connection_options: ConnectionOptions,
112    /// A list of listeners that will receive events from the `LightstreamerClient` instance.
113    listeners: Vec<Box<dyn ClientListener>>,
114    /// A list containing all the `Subscription` instances that are currently "active" on this
115    /// `LightstreamerClient`.
116    subscriptions: Vec<Subscription>,
117    /// The current status of the client.
118    status: ClientStatus,
119    /// Logging Type to be used
120    logging: LogType,
121    /// The sender that can be used to subscribe/unsubscribe
122    pub subscription_sender: Sender<SubscriptionRequest>,
123    /// The receiver used for subscribe/unsubsribe
124    subscription_receiver: Receiver<SubscriptionRequest>,
125}
126
127/// Retrieve a reference to a subscription with the given `id`
128fn get_subscription_by_id(
129    subscriptions: &Vec<Subscription>,
130    subscription_id: usize,
131) -> Option<&Subscription> {
132    subscriptions
133        .iter()
134        .find(|sub| sub.id == subscription_id)
135}
136
137impl Debug for LightstreamerClient {
138    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
139        f.debug_struct("LightstreamerClient")
140            .field("server_address", &self.server_address)
141            .field("adapter_set", &self.adapter_set)
142            .field("connection_details", &self.connection_details)
143            .field("connection_options", &self.connection_options)
144            .field("listeners", &self.listeners)
145            .field("subscriptions", &self.subscriptions)
146            .finish()
147    }
148}
149
150impl LightstreamerClient {
151    /// A constant string representing the name of the library.
152    pub const LIB_NAME: &'static str = "rust_client";
153
154    /// A constant string representing the version of the library.
155    pub const LIB_VERSION: &'static str = "0.1.0";
156
157    //
158    // Constants for WebSocket connection.
159    //
160    pub const SEC_WEBSOCKET_KEY: &'static str = "PNDUibe9ex7PnsrLbt0N4w==";
161    pub const SEC_WEBSOCKET_PROTOCOL: &'static str = "TLCP-2.4.0.lightstreamer.com";
162    pub const SEC_WEBSOCKET_VERSION: &'static str = "13";
163    pub const SEC_WEBSOCKET_UPGRADE: &'static str = "websocket";
164
165    /// A constant string representing the version of the TLCP protocol used by the library.
166    pub const TLCP_VERSION: &'static str = "TLCP-2.4.0";
167
168    /// Static method that can be used to share cookies between connections to the Server (performed by
169    /// this library) and connections to other sites that are performed by the application. With this
170    /// method, cookies received by the application can be added (or replaced if already present) to
171    /// the cookie set used by the library to access the Server. Obviously, only cookies whose domain
172    /// is compatible with the Server domain will be used internally.
173    ///
174    /// This method should be invoked before calling the `LightstreamerClient.connect()` method.
175    /// However it can be invoked at any time; it will affect the internal cookie set immediately
176    /// and the sending of cookies on the next HTTP request or WebSocket establishment.
177    ///
178    /// # Parameters
179    ///
180    /// * `uri`: the URI from which the supplied cookies were received. It cannot be `None`.
181    /// * `cookies`: an instance of `http.cookies.SimpleCookie`.
182    ///
183    /// See also `getCookies()`
184    pub fn add_cookies(_uri: &str, _cookies: &Cookie) {
185        // Implementation for add_cookies
186        unimplemented!("Implement mechanism to add cookies to LightstreamerClient");
187    }
188
189    /// Adds a listener that will receive events from the `LightstreamerClient` instance.
190    ///
191    /// The same listener can be added to several different `LightstreamerClient` instances.
192    ///
193    /// A listener can be added at any time. A call to add a listener already present will be ignored.
194    ///
195    /// # Parameters
196    ///
197    /// * `listener`: An object that will receive the events as documented in the `ClientListener`
198    ///   interface.
199    ///
200    /// See also `removeListener()`
201    pub fn add_listener(&mut self, listener: Box<dyn ClientListener>) {
202        self.listeners.push(listener);
203    }
204
205    /// Packs s string with the necessary parameters for a subscription request.
206    /// 
207    /// # Parameters
208    /// 
209    /// * `subscription`: The subscription for which to get the parameters.
210    /// * `request_id`: The request ID to use in the parameters.
211    /// 
212    fn get_subscription_params(
213        subscription: &Subscription,
214        request_id: usize,
215    ) -> Result<String, Box<dyn Error + Send + Sync>> {
216        let ls_req_id = request_id.to_string();
217        let ls_sub_id = subscription.id.to_string();
218        let ls_mode = subscription.get_mode().to_string();
219        let ls_group = match subscription.get_item_group() {
220            Some(item_group) => item_group.to_string(),
221            None => match subscription.get_items() {
222                Some(items) => items.join(" "),
223                None => {
224                    return Err(Box::new(std::io::Error::new(
225                        std::io::ErrorKind::InvalidData,
226                        "No item group or items found in subscription.",
227                    )));
228                }
229            },
230        };
231        let ls_schema = match subscription.get_field_schema() {
232            Some(field_schema) => field_schema.to_string(),
233            None => match subscription.get_fields() {
234                Some(fields) => fields.join(" "),
235                None => {
236                    return Err(Box::new(std::io::Error::new(
237                        std::io::ErrorKind::InvalidData,
238                        "No field schema or fields found in subscription.",
239                    )));
240                }
241            },
242        };
243        let ls_data_adapter = match subscription.get_data_adapter() {
244            Some(data_adapter) => data_adapter.to_string(),
245            None => "".to_string(),
246        };
247        let ls_snapshot = subscription
248            .get_requested_snapshot()
249            .unwrap_or_default()
250            .to_string();
251        //
252        // Prepare the subscription request.
253        //
254        let mut params: Vec<(&str, &str)> = vec![
255            ("LS_data_adapter", &ls_data_adapter),
256            ("LS_reqId", &ls_req_id),
257            ("LS_op", "add"),
258            ("LS_subId", &ls_sub_id),
259            ("LS_mode", &ls_mode),
260            ("LS_group", &ls_group),
261            ("LS_schema", &ls_schema),
262            ("LS_ack", "false"),
263        ];
264        // Remove the data adapter parameter if not specified.
265        if ls_data_adapter == "" {
266            params.remove(0);
267        }
268        if ls_snapshot != "" {
269            params.push(("LS_snapshot", &ls_snapshot));
270        }
271
272        Ok(serde_urlencoded::to_string(&params)?)
273    }
274    
275    fn get_unsubscription_params(
276        subscription_id: usize,
277        request_id: usize,
278    ) -> Result<String, Box<dyn Error + Send + Sync>> {
279        let ls_req_id = request_id.to_string();
280        let ls_sub_id = subscription_id.to_string();
281        //
282        // Prepare the unsubscription request.
283        //
284        let params: Vec<(&str, &str)> = vec![
285            ("LS_reqId", &ls_req_id),
286            ("LS_op", "delete"),
287            ("LS_subId", &ls_sub_id),
288        ];
289
290        Ok(serde_urlencoded::to_string(&params)?)
291    }
292
293    /// Operation method that requests to open a Session against the configured Lightstreamer Server.
294    ///
295    /// When `connect()` is called, unless a single transport was forced through `ConnectionOptions.setForcedTransport()`,
296    /// the so called "Stream-Sense" mechanism is started: if the client does not receive any answer
297    /// for some seconds from the streaming connection, then it will automatically open a polling
298    /// connection.
299    ///
300    /// A polling connection may also be opened if the environment is not suitable for a streaming
301    /// connection.
302    ///
303    /// Note that as "polling connection" we mean a loop of polling requests, each of which requires
304    /// opening a synchronous (i.e. not streaming) connection to Lightstreamer Server.
305    ///
306    /// Note that the request to connect is accomplished by the client in a separate thread; this
307    /// means that an invocation to `getStatus()` right after `connect()` might not reflect the change
308    /// yet.
309    ///
310    /// When the request to connect is finally being executed, if the current status of the client
311    /// is not `DISCONNECTED`, then nothing will be done.
312    ///
313    /// # Raises
314    ///
315    /// * `IllegalStateException`: if no server address was configured.
316    ///
317    /// See also `getStatus()`
318    ///
319    /// See also `disconnect()`
320    ///
321    /// See also `ClientListener.onStatusChange()`
322    ///
323    /// See also `ConnectionDetails.setServerAddress()`
324    #[instrument]
325    pub async fn connect(&mut self, shutdown_signal: Arc<Notify>) -> Result<(), Box<dyn Error + Send + Sync>> {
326        // Check if the server address is configured.
327        if self.server_address.is_none() {
328            return Err(Box::new(IllegalStateException::new(
329                "No server address was configured.",
330            )));
331        }
332        //
333        // Only WebSocket streaming transport is currently supported.
334        //
335        let forced_transport = self.connection_options.get_forced_transport();
336        if forced_transport.is_none()
337            || *forced_transport.unwrap() /* unwrap() is safe here */ != Transport::WsStreaming
338        {
339            return Err(Box::new(IllegalStateException::new(
340                "Only WebSocket streaming transport is currently supported.",
341            )));
342        }
343        //
344        // Convert the HTTP URL to a WebSocket URL.
345        //
346        let http_url = self.connection_details.get_server_address().unwrap(); // unwrap() is safe here.
347        let mut url = Url::parse(&http_url)
348            .expect("Failed to parse server address URL from connection details.");
349        match url.scheme() {
350            "http" => url
351                .set_scheme("ws")
352                .expect("Failed to set scheme to ws for WebSocket URL."),
353            "https" => url
354                .set_scheme("wss")
355                .expect("Failed to set scheme to wss for WebSocket URL."),
356            invalid_scheme => {
357                return Err(Box::new(IllegalStateException::new(&format!(
358                    "Unsupported scheme '{}' found when converting HTTP URL to WebSocket URL.",
359                    invalid_scheme
360                ))));
361            }
362        }
363        let ws_url = url.as_str();
364
365        // Build the WebSocket request with the necessary headers.
366        let request = Request::builder()
367            .uri(ws_url)
368            .header(
369                HeaderName::from_static("connection"),
370                HeaderValue::from_static("Upgrade"),
371            )
372            .header(
373                HeaderName::from_static("host"),
374                HeaderValue::from_str(url.host_str().unwrap_or("localhost")).map_err(|err| {
375                    IllegalStateException::new(&format!(
376                        "Invalid header value for header with name 'host': {}",
377                        err
378                    ))
379                })?,
380            )
381            .header(
382                HeaderName::from_static("sec-websocket-key"),
383                HeaderValue::from_static(Self::SEC_WEBSOCKET_KEY),
384            )
385            .header(
386                HeaderName::from_static("sec-websocket-protocol"),
387                HeaderValue::from_static(Self::SEC_WEBSOCKET_PROTOCOL),
388            )
389            .header(
390                HeaderName::from_static("sec-websocket-version"),
391                HeaderValue::from_static(Self::SEC_WEBSOCKET_VERSION),
392            )
393            .header(
394                HeaderName::from_static("upgrade"),
395                HeaderValue::from_static(Self::SEC_WEBSOCKET_UPGRADE),
396            )
397            .body(())?;
398
399        // Connect to the Lightstreamer server using WebSocket.
400        let ws_stream = match connect_async(request).await {
401            Ok((ws_stream, response)) => {
402                if let Some(server_header) = response.headers().get("server") {
403                    self.make_log(
404                        Level::INFO,
405                        &format!(
406                            "Connected to Lightstreamer server: {}",
407                            server_header.to_str().unwrap_or("")
408                        ),
409                    );
410                } else {
411                    self.make_log(Level::INFO, "Connected to Lightstreamer server");
412                }
413                ws_stream
414            }
415            Err(err) => {
416                return Err(Box::new(std::io::Error::new(
417                    std::io::ErrorKind::ConnectionRefused,
418                    format!(
419                        "Failed to connect to Lightstreamer server with WebSocket: {}",
420                        err
421                    ),
422                )));
423            }
424        };
425
426        // Split the WebSocket stream into a write and a read stream.
427        let (mut write_stream, mut read_stream) = ws_stream.split();
428
429        //
430        // Initiate communication with the server by sending a 'wsok' message.
431        //
432        write_stream.send(Message::Text("wsok".into())).await?;
433
434        //
435        // Start reading and processing messages from the server.
436        //
437        let mut is_connected = false;
438        let mut request_id: usize = 0;
439        let mut _session_id: Option<String> = None;
440        let mut subscription_id: usize = 0;
441        let mut subscription_item_updates: HashMap<usize, HashMap<usize, ItemUpdate>> =
442            HashMap::new();
443        loop {
444            tokio::select! {
445                message = read_stream.next() => {
446                    match message {
447                        Some(Ok(Message::Text(text))) => {
448                            // Messages could include multiple submessages separated by /r/n.
449                            // Split the message into submessages and process each one separately.
450                            let submessages: Vec<&str> = text.split("\r\n")
451                                .filter(|&line| !line.trim().is_empty()) // Filter out empty lines.
452                                .collect();
453                            for submessage in submessages {
454                                let clean_text = clean_message(&submessage);
455                                let submessage_fields: Vec<&str> = clean_text.split(",").collect();
456                                match *submessage_fields.first().unwrap_or(&"") {
457                                    //
458                                    // Errors from server.
459                                    //
460                                    "conerr" | "reqerr" => {
461                                        self.make_log( Level::ERROR, &format!("Received connection error from Lightstreamer server: {}", clean_text) );
462                                        break;
463                                    },
464                                    //
465                                    // Session created successfully.
466                                    //
467                                    "conok" => {
468                                        is_connected = true;
469                                        if let Some(session_id) = submessage_fields.get(1).as_deref() {
470                                            self.make_log( Level::DEBUG, &format!("Session creation confirmed by server: {}", clean_text) );
471                                            self.make_log( Level::DEBUG, &format!("Session created with ID: {:?}", session_id) );
472                                            //
473                                            // Subscribe to the desired items.
474                                            //
475                                            while let Some(subscription) = self.subscriptions.get_mut(subscription_id) {
476                                                //
477                                                // Gather all the necessary subscription parameters.
478                                                //
479                                                subscription_id += 1;
480                                                request_id += 1;
481                                                subscription.id = subscription_id;
482                                                subscription.id_sender.send(subscription_id)?;
483
484                                                let encoded_params = match Self::get_subscription_params(subscription, request_id)
485                                                {
486                                                    Ok(params) => params,
487                                                    Err(err) => {
488                                                        return Err(err);
489                                                    },
490                                                };
491
492                                                write_stream
493                                                    .send(Message::Text(format!("control\r\n{}", encoded_params).into()))
494                                                    .await?;
495                                                info!("Sent subscription request: '{}'", encoded_params);
496                                            }
497                                        } else {
498                                            return Err(Box::new(std::io::Error::new(
499                                                std::io::ErrorKind::InvalidData,
500                                                "Session ID not found in 'conok' message from server",
501                                            )));
502                                        }
503                                    },
504                                    //
505                                    // Notifications from server.
506                                    //
507                                    "conf" | "cons" | "clientip" | "servname" | "prog" | "sync" | "eos" => {
508                                        self.make_log( Level::INFO, &format!("Received notification from server: {}", clean_text) );
509                                        // Don't do anything with these notifications for now.
510                                    },
511                                    "probe" => {
512                                        self.make_log( Level::DEBUG, &format!("Received probe message from server: {}", clean_text ) );
513                                    },
514                                    "reqok" => {
515                                        self.make_log( Level::DEBUG, &format!("Received reqok message from server: '{}'", clean_text ) );
516                                    },
517                                    //
518                                    // Subscription confirmation from server.
519                                    //
520                                    "subok" => {
521                                        self.make_log( Level::INFO, &format!("Subscription confirmed by server: '{}'", clean_text) );
522                                    },
523                                    //
524                                    // Usubscription confirmation from server.
525                                    //
526                                    "unsub" => {
527                                        self.make_log( Level::INFO, &format!("Unsubscription confirmed by server: '{}'", clean_text) );
528                                    },
529                                    //
530                                    // Data updates from server.
531                                    //
532                                    "u" => {
533                                        // Parse arguments from the received message.
534                                        let arguments = parse_arguments(&clean_text);
535                                        //
536                                        // Extract the subscription from the first argument.
537                                        //
538                                        let subscription_index = arguments.get(1).unwrap_or(&"").parse::<usize>().unwrap_or(0);
539                                        let subscription = match get_subscription_by_id(self.get_subscriptions(), subscription_index) {
540                                            Some(subscription) => subscription,
541                                            None => {
542                                                self.make_log( Level::WARN, &format!("Subscription not found for index: {}", subscription_index) );
543                                                continue;
544
545                                            }
546                                        };
547                                        //
548                                        // Extract the item from the second argument.
549                                        //
550                                        let item_index = arguments.get(2).unwrap_or(&"").parse::<usize>().unwrap_or(0);
551                                        let item = match subscription.get_items() {
552                                            Some(items) => items.get(item_index-1),
553                                            None => {
554                                                self.make_log( Level::WARN, &format!("No items found in subscription: {:?}", subscription) );
555                                                continue;
556                                            }
557                                        };
558                                        //
559                                        // Determine if the update is a snapshot or real-time update based on the subscription parameters.
560                                        //
561                                        let is_snapshot = match subscription.get_requested_snapshot() {
562                                            Some(ls_snapshot) => {
563                                                match ls_snapshot {
564                                                    Snapshot::No => false,
565                                                    Snapshot::Yes => {
566                                                        match subscription.get_mode() {
567                                                            SubscriptionMode::Merge => {
568                                                                if arguments.len() == 4 && arguments[3] == "$" {
569                                                                    // EOS notification received
570                                                                    true
571                                                                } else {
572                                                                    // If item doesn't exist in item_updates yet, the first update
573                                                                    // is always a snapshot.
574                                                                    if let Some(item_updates) = subscription_item_updates.get(&(subscription_index)) {
575                                                                        if let Some(_) = item_updates.get(&(item_index)) {
576                                                                            // Item update already exists in item_updates, so it's not a snapshot.
577                                                                            false
578                                                                        } else {
579                                                                            // Item update doesn't exist in item_updates, so the first update is always a snapshot.
580                                                                            true
581                                                                        }
582                                                                    } else {
583                                                                        // Item updates not found for subscription, so the first update is always a snapshot.
584                                                                        true
585                                                                    }
586                                                                }
587                                                            },
588                                                            SubscriptionMode::Distinct | SubscriptionMode::Command => {
589                                                                if !subscription.is_subscribed() {
590                                                                    true
591                                                                } else {
592                                                                    false
593                                                                }
594                                                            },
595                                                            _ => false,
596                                                        }
597                                                    },
598                                                    _ => false,
599                                                }
600                                            },
601                                            None => false,
602                                        };
603
604                                        // Extract the field values from the third argument.
605                                        let field_values: Vec<&str> = arguments.get(3).unwrap_or(&"").split('|').collect();
606
607                                        //
608                                        // Get fields from subscription and create a HashMap of field names and values.
609                                        //
610                                        let subscription_fields = subscription.get_fields();
611                                        let mut field_map: HashMap<String, Option<String>> = subscription_fields
612                                            .map(|fields| fields.iter().map(|field_name| (field_name.to_string(), None)).collect())
613                                            .unwrap_or_default();
614
615                                        let mut field_index = 0;
616                                        for value in field_values {
617                                            match value {
618                                                "" => {
619                                                    // An empty value means the field is unchanged compared to the previous update of the same field.
620                                                    if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
621                                                        field_map.insert(field_name.to_string(), None);
622                                                    }
623                                                    field_index += 1;
624                                                }
625                                                "#" | "$" => {
626                                                    // A value corresponding to a hash sign "#" or dollar sign "$" means the field is null or empty.
627                                                    if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
628                                                        field_map.insert(field_name.to_string(), Some("".to_string()));
629                                                    }
630                                                    field_index += 1;
631                                                }
632                                                value if value.starts_with('^') => {
633                                                    let command = value.chars().nth(1).unwrap_or(' ');
634                                                    match command {
635                                                        '0'..='9' => {
636                                                            let count = value[1..].parse().unwrap_or(0);
637                                                            for i in 0..count {
638                                                                if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index + i)) {
639                                                                    field_map.insert(field_name.to_string(), None);
640                                                                }
641                                                            }
642                                                            field_index += count;
643                                                        }
644                                                        'P' | 'T' => {
645                                                            let diff_value = serde_urlencoded::from_str(&value[2..]).unwrap_or_else(|_| value[2..].to_string());
646                                                            if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
647                                                                if let Some(prev_value) = field_map.get(field_name).and_then(|v| v.as_ref()) {
648                                                                    let new_value = match command {
649                                                                        'P' => {
650                                                                            // Apply JSON Patch
651                                                                            let patch: serde_json::Value = serde_json::from_str(&diff_value).unwrap_or(serde_json::Value::Null);
652                                                                            let mut prev_json: serde_json::Value = serde_json::from_str(prev_value).unwrap_or(serde_json::Value::Null);
653                                                                            let patch_operations: Vec<json_patch::PatchOperation> = serde_json::from_value(patch).unwrap_or_default();
654                                                                            let _ = json_patch::patch(&mut prev_json, &patch_operations);
655                                                                            prev_json.to_string()
656                                                                        }
657                                                                        'T' => {
658                                                                            // Apply TLCP-diff
659                                                                            //tlcp_diff::apply_diff(prev_value, &diff_value).unwrap_or_else(|_| prev_value.to_string())
660                                                                            unimplemented!("Implement TLCP-diff");
661                                                                        }
662                                                                        _ => unreachable!(),
663                                                                    };
664                                                                    field_map.insert(field_name.to_string(), Some(new_value.to_string()));
665                                                                }
666                                                            }
667                                                            field_index += 1;
668                                                        }
669                                                        _ => {
670                                                            let decoded_value = serde_urlencoded::from_str(value).unwrap_or_else(|_| value.to_string());
671                                                            if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
672                                                                field_map.insert(field_name.to_string(), Some(decoded_value));
673                                                            }
674                                                            field_index += 1;
675                                                        }
676                                                    }
677                                                }
678                                                value if value.starts_with('{') => {
679                                                    // in this case it is a json payload that we will let the consumer handle. In this case, it is important
680                                                    // to preserve casing for parsing.
681                                                    let original_json = parse_arguments(&submessage).get(3).unwrap_or(&"").split('|').collect::<Vec<&str>>();
682                                                    let mut payload = "";
683                                                    for json in original_json.iter()
684                                                    {
685                                                        if json.is_empty() || json.to_string() == "#"
686                                                        {
687                                                            continue;
688                                                        }
689                                                        
690                                                        payload = json;
691                                                    }
692                                                    
693                                                    if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
694                                                        field_map.insert(field_name.to_string(), Some(payload.to_string()));
695                                                    }
696                                                    field_index += 1;
697                                                }
698                                                _ => {
699                                                    let decoded_value = serde_urlencoded::from_str(value).unwrap_or_else(|_| value.to_string());
700                                                    if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
701                                                        field_map.insert(field_name.to_string(), Some(decoded_value));
702                                                    }
703                                                    field_index += 1;
704                                                }
705                                            }
706                                        }
707
708                                        // Store only item_update's changed fields.
709                                        let changed_fields: HashMap<String, String> = field_map.iter()
710                                            .filter_map(|(k, v)| {
711                                                if let Some(v) = v {
712                                                    Some((k.clone(), v.clone()))
713                                                } else {
714                                                    None
715                                                }
716                                            })
717                                            .collect();
718
719                                        //
720                                        // Take the proper item_update from item_updates and update it with changed fields.
721                                        // If the item_update doesn't exist yet, create a new one.
722                                        //
723                                        let current_item_update: ItemUpdate;
724                                        match subscription_item_updates.get_mut(&(subscription_index)) {
725                                            Some(item_updates) => match item_updates.get_mut(&(item_index)) {
726                                                Some(item_update) => {
727                                                    //
728                                                    // Iterate changed_fields and update existing item_update.fields assigning the new values.
729                                                    //
730                                                    for (field_name, new_value) in &changed_fields {
731                                                        if item_update.fields.contains_key(field_name) {
732                                                            item_update.fields.insert((*field_name).clone(), Some(new_value.clone()));
733                                                        }
734                                                    }
735                                                    item_update.changed_fields = changed_fields;
736                                                    item_update.is_snapshot = is_snapshot;
737                                                    current_item_update = item_update.clone();
738                                                },
739                                                None => {
740                                                    // Create a new item_update and add it to item_updates.
741                                                    let item_update = ItemUpdate {
742                                                        item_name: item.cloned(),
743                                                        item_pos: item_index,
744                                                        fields: field_map,
745                                                        changed_fields: changed_fields,
746                                                        is_snapshot: is_snapshot,
747                                                    };
748                                                    current_item_update = item_update.clone();
749                                                    item_updates.insert(item_index, item_update);
750                                                }
751                                            },
752                                            None => {
753                                                // Create a new item_update and add it to item_updates.
754                                                let item_update = ItemUpdate {
755                                                    item_name: item.cloned(),
756                                                    item_pos: item_index,
757                                                    fields: field_map,
758                                                    changed_fields: changed_fields,
759                                                    is_snapshot: is_snapshot,
760                                                };
761                                                current_item_update = item_update.clone();
762                                                let mut item_updates = HashMap::new();
763                                                item_updates.insert(item_index, item_update);
764                                                subscription_item_updates.insert(subscription_index, item_updates);
765                                            }
766                                        };
767
768                                        // Get mutable subscription listeners directly.
769                                        let subscription_listeners = subscription.get_listeners();
770
771                                        // Iterate subscription listeners and call on_item_update for each listener.
772                                        for listener in subscription_listeners {
773                                            listener.on_item_update(&current_item_update);
774                                        }
775                                    }
776                                    //
777                                    // Connection confirmation from server.
778                                    //
779                                    "wsok" => {
780                                        self.make_log( Level::INFO, &format!("Connection confirmed by server: '{}'", clean_text) );
781                                        //
782                                        // Request session creation.
783                                        //
784                                        let ls_adapter_set = match self.connection_details.get_adapter_set() {
785                                            Some(adapter_set) => adapter_set,
786                                            None => {
787                                                return Err(Box::new(IllegalStateException::new(
788                                                    "No adapter set found in connection details.",
789                                                )));
790                                            },
791                                        };
792                                        let ls_send_sync = self.connection_options.get_send_sync().to_string();
793                                        let mut params: Vec<(&str, &str)> = vec![
794                                            ("LS_adapter_set", &ls_adapter_set),
795                                            ("LS_cid", "mgQkwtwdysogQz2BJ4Ji kOj2Bg"),
796                                            ("LS_send_sync", &ls_send_sync),
797                                        ];
798                                        if let Some(user) = &self.connection_details.get_user() {
799                                            params.push(("LS_user", user));
800                                        }
801                                        if let Some(password) = &self.connection_details.get_password() {
802                                            params.push(("LS_password", password));
803                                        }
804                                        params.push(("LS_protocol", Self::TLCP_VERSION));
805                                        let encoded_params = serde_urlencoded::to_string(&params)?;
806                                        write_stream
807                                            .send(Message::Text(format!("create_session\r\n{}\n", encoded_params).into()))
808                                            .await?;
809                                        self.make_log( Level::DEBUG, &format!("Sent create session request: '{}'", encoded_params) );
810                                    },
811                                    unexpected_message => {
812                                        return Err(Box::new(std::io::Error::new(
813                                            std::io::ErrorKind::InvalidData,
814                                            format!(
815                                                "Unexpected message received from server: '{:?}'",
816                                                unexpected_message
817                                            ),
818                                        )));
819                                    },
820                                }
821                            }
822                        },
823                        Some(Ok(non_text_message)) => {
824                            return Err(Box::new(std::io::Error::new(
825                                std::io::ErrorKind::InvalidData,
826                                format!(
827                                    "Unexpected non-text message from server: {:?}",
828                                    non_text_message
829                                ),
830                            )));
831                        },
832                        Some(Err(err)) => {
833                            return Err(Box::new(std::io::Error::new(
834                                std::io::ErrorKind::InvalidData,
835                                format!("Error reading message from server: {}", err),
836                            )));
837                        },
838                        None => {
839                            self.make_log( Level::DEBUG, "No more messages from server" );
840                            break;
841                        },
842                    }
843                },
844                Some(subscription_request) = self.subscription_receiver.recv() => {
845                    println!("Received subscription/unsubscription request.");
846                    request_id += 1;
847                    // Process subscription requests.
848                    if subscription_request.subscription.is_some()
849                    {
850                        self.subscriptions.push(subscription_request.subscription.unwrap());
851
852                        // if we are not connected yet, we will subscribe later
853                        if !is_connected {
854                            continue;
855                        }
856
857                        subscription_id += 1;
858                        self.subscriptions.last_mut().unwrap().id = subscription_id;
859                        self.subscriptions.last().unwrap().id_sender.send(subscription_id)?;
860
861                        let encoded_params = match Self::get_subscription_params(self.subscriptions.last().unwrap(), request_id)
862                        {
863                            Ok(params) => params,
864                            Err(err) => {
865                                return Err(err);
866                            },
867                        };
868
869                        write_stream
870                            .send(Message::Text(format!("control\r\n{}", encoded_params).into()))
871                            .await?;
872                        
873                        self.make_log( Level::INFO, &format!("Sent subscription request: '{}'", encoded_params) );
874                    }
875                    // Process unsubscription requests.
876                    else if subscription_request.subscription_id.is_some()
877                    {
878                        let unsubscription_id = subscription_request.subscription_id.unwrap();
879                        let encoded_params = match Self::get_unsubscription_params(unsubscription_id, request_id)
880                        {
881                            Ok(params) => params,
882                            Err(err) => {
883                                return Err(err);
884                            },
885                        };
886                        
887                        write_stream
888                            .send(Message::Text(format!("control\r\n{}", encoded_params).into()))
889                            .await?;
890                        
891                        self.make_log( Level::INFO, &format!("Sent unsubscription request: '{}'", encoded_params) );
892                        
893                        self.subscriptions.retain(|s| s.id != unsubscription_id);
894                        
895                        if self.subscriptions.is_empty()
896                        {
897                            self.make_log( Level::INFO, &"No more subscriptions, disconnecting".to_string() );
898                            shutdown_signal.notify_one();
899                        }
900                    }
901                },
902                _ = shutdown_signal.notified() => {
903                    self.make_log( Level::INFO, &format!("Received shutdown signal") );
904                    break;
905                },
906            }
907        }
908
909        Ok(())
910    }
911
912    /// Operation method that requests to close the Session opened against the configured Lightstreamer
913    /// Server (if any).
914    ///
915    /// When `disconnect()` is called, the "Stream-Sense" mechanism is stopped.
916    ///
917    /// Note that active `Subscription` instances, associated with this `LightstreamerClient` instance,
918    /// are preserved to be re-subscribed to on future Sessions.
919    ///
920    /// Note that the request to disconnect is accomplished by the client in a separate thread; this
921    /// means that an invocation to `getStatus()` right after `disconnect()` might not reflect the
922    /// change yet.
923    ///
924    /// When the request to disconnect is finally being executed, if the status of the client is
925    /// "DISCONNECTED", then nothing will be done.
926    ///
927    /// See also `connect()`
928    #[instrument]
929    pub async fn disconnect(&mut self) {
930        // Implementation for disconnect
931        self.make_log( Level::INFO, "Disconnecting from Lightstreamer server" );
932    }
933
934    /// Static inquiry method that can be used to share cookies between connections to the Server
935    /// (performed by this library) and connections to other sites that are performed by the application.
936    /// With this method, cookies received from the Server can be extracted for sending through other
937    /// connections, according with the URI to be accessed.
938    ///
939    /// See `addCookies()` for clarifications on when cookies are directly stored by the library and
940    /// when not.
941    ///
942    /// # Parameters
943    ///
944    /// * `uri`: the URI to which the cookies should be sent, or `None`.
945    ///
946    /// # Returns
947    ///
948    /// A list with the various cookies that can be sent in a HTTP request for the specified URI.
949    /// If a `None` URI was supplied, all available non-expired cookies will be returned.
950    pub fn get_cookies(_uri: Option<&str>) -> Cookie {
951        // Implementation for get_cookies
952        unimplemented!()
953    }
954
955    /// Returns a list containing the `ClientListener` instances that were added to this client.
956    ///
957    /// # Returns
958    ///
959    /// A list containing the listeners that were added to this client.
960    ///
961    /// See also `addListener()`
962    pub fn get_listeners(&self) -> &Vec<Box<dyn ClientListener>> {
963        &self.listeners
964    }
965
966    /// Inquiry method that gets the current client status and transport (when applicable).
967    ///
968    /// # Returns
969    ///
970    /// The current client status. It can be one of the following values:
971    ///
972    /// - `"CONNECTING"`: the client is waiting for a Server's response in order to establish a connection;
973    /// - `"CONNECTED:STREAM-SENSING"`: the client has received a preliminary response from the server
974    ///   and is currently verifying if a streaming connection is possible;
975    /// - `"CONNECTED:WS-STREAMING"`: a streaming connection over WebSocket is active;
976    /// - `"CONNECTED:HTTP-STREAMING"`: a streaming connection over HTTP is active;
977    /// - `"CONNECTED:WS-POLLING"`: a polling connection over WebSocket is in progress;
978    /// - `"CONNECTED:HTTP-POLLING"`: a polling connection over HTTP is in progress;
979    /// - `"STALLED"`: the Server has not been sending data on an active streaming connection for
980    ///   longer than a configured time;
981    /// - `"DISCONNECTED:WILL-RETRY"`: no connection is currently active but one will be opened
982    ///   (possibly after a timeout);
983    /// - `"DISCONNECTED:TRYING-RECOVERY"`: no connection is currently active, but one will be opened
984    ///   as soon as possible, as an attempt to recover the current session after a connection issue;
985    /// - `"DISCONNECTED"`: no connection is currently active.
986    ///
987    /// See also `ClientListener.onStatusChange()`
988    pub fn get_status(&self) -> &ClientStatus {
989        &self.status
990    }
991
992    /// Inquiry method that returns a list containing all the `Subscription` instances that are
993    /// currently "active" on this `LightstreamerClient`.
994    ///
995    /// Internal second-level `Subscription` are not included.
996    ///
997    /// # Returns
998    ///
999    /// A list, containing all the `Subscription` currently "active" on this `LightstreamerClient`.
1000    /// The list can be empty.
1001    ///
1002    /// See also `subscribe()`
1003    pub fn get_subscriptions(&self) -> &Vec<Subscription> {
1004        &self.subscriptions
1005    }
1006
1007    /// Creates a new instance of `LightstreamerClient`.
1008    ///
1009    /// The constructor initializes the client with the server address and adapter set, if provided.
1010    /// It sets up the connection details and options for the client. If no server address or
1011    /// adapter set is specified, those properties on the client will be `None`. This allows
1012    /// for late configuration of these details before connecting to the Lightstreamer server.
1013    ///
1014    /// # Arguments
1015    /// * `server_address` - An optional reference to a string slice that represents the server
1016    ///   address to connect to. If `None`, the server address must be set later.
1017    /// * `adapter_set` - An optional reference to a string slice that specifies the adapter set name.
1018    ///   If `None`, the adapter set must be set later.
1019    ///
1020    /// # Returns
1021    /// A result containing the new `LightstreamerClient` instance if successful, or an
1022    /// `IllegalStateException` if the initialization fails due to invalid state conditions.
1023    ///
1024    /// # Panics
1025    /// Does not panic under normal circumstances. However, unexpected internal errors during
1026    /// the creation of internal components could cause panics, which should be considered when
1027    /// using this function in production code.
1028    ///
1029    /// # Example
1030    /// ```
1031    /// // Example usage of `new` to create a LightstreamerClient with specified server address and
1032    /// // adapter set.
1033    /// use lightstreamer_client::ls_client::LightstreamerClient;
1034    /// let server_address = Some("http://myserver.com");
1035    /// let adapter_set = Some("MY_ADAPTER_SET");
1036    /// let ls_client = LightstreamerClient::new(server_address, adapter_set, None, None);
1037    /// assert!(ls_client.is_ok());
1038    /// ```
1039    pub fn new(
1040        server_address: Option<&str>,
1041        adapter_set: Option<&str>,
1042        username: Option<&str>,
1043        password: Option<&str>,
1044    ) -> Result<LightstreamerClient, Box<dyn Error>> {
1045        let connection_details =
1046            ConnectionDetails::new(server_address, adapter_set, username, password)?;
1047        let connection_options = ConnectionOptions::default();
1048        let (subscription_sender, subscription_receiver) = mpsc::channel(100);
1049
1050        Ok(LightstreamerClient {
1051            server_address: server_address.map(|s| s.to_string()),
1052            adapter_set: adapter_set.map(|s| s.to_string()),
1053            connection_details,
1054            connection_options,
1055            listeners: Vec::new(),
1056            subscriptions: Vec::new(),
1057            status: ClientStatus::Disconnected(DisconnectionType::WillRetry),
1058            logging: LogType::StdLogs,
1059            subscription_sender,
1060            subscription_receiver,
1061        })
1062    }
1063
1064    /// Removes a listener from the `LightstreamerClient` instance so that it will not receive
1065    /// events anymore.
1066    ///
1067    /// A listener can be removed at any time.
1068    ///
1069    /// # Parameters
1070    ///
1071    /// * `listener`: The listener to be removed.
1072    ///
1073    /// See also `addListener()`
1074    pub fn remove_listener(&mut self, _listener: Box<dyn ClientListener>) {
1075        unimplemented!("Implement mechanism to remove listener from LightstreamerClient");
1076        //self.listeners.remove(&listener);
1077    }
1078
1079    /// Operation method that sends a message to the Server. The message is interpreted and handled
1080    /// by the Metadata Adapter associated to the current Session. This operation supports in-order
1081    /// guaranteed message delivery with automatic batching. In other words, messages are guaranteed
1082    /// to arrive exactly once and respecting the original order, whatever is the underlying transport
1083    /// (HTTP or WebSockets). Furthermore, high frequency messages are automatically batched, if
1084    /// necessary, to reduce network round trips.
1085    ///
1086    /// Upon subsequent calls to the method, the sequential management of the involved messages is
1087    /// guaranteed. The ordering is determined by the order in which the calls to `sendMessage` are
1088    /// issued.
1089    ///
1090    /// If a message, for any reason, doesn't reach the Server (this is possible with the HTTP transport),
1091    /// it will be resent; however, this may cause the subsequent messages to be delayed. For this
1092    /// reason, each message can specify a "delayTimeout", which is the longest time the message,
1093    /// after reaching the Server, can be kept waiting if one of more preceding messages haven't
1094    /// been received yet. If the "delayTimeout" expires, these preceding messages will be discarded;
1095    /// any discarded message will be notified to the listener through `ClientMessageListener.onDiscarded()`.
1096    /// Note that, because of the parallel transport of the messages, if a zero or very low timeout
1097    /// is set for a message and the previous message was sent immediately before, it is possible
1098    /// that the latter gets discarded even if no communication issues occur. The Server may also
1099    /// enforce its own timeout on missing messages, to prevent keeping the subsequent messages for
1100    /// long time.
1101    ///
1102    /// Sequence identifiers can also be associated with the messages. In this case, the sequential
1103    /// management is restricted to all subsets of messages with the same sequence identifier associated.
1104    ///
1105    /// Notifications of the operation outcome can be received by supplying a suitable listener. The
1106    /// supplied listener is guaranteed to be eventually invoked; listeners associated with a sequence
1107    /// are guaranteed to be invoked sequentially.
1108    ///
1109    /// The "UNORDERED_MESSAGES" sequence name has a special meaning. For such a sequence, immediate
1110    /// processing is guaranteed, while strict ordering and even sequentialization of the processing
1111    /// is not enforced. Likewise, strict ordering of the notifications is not enforced. However,
1112    /// messages that, for any reason, should fail to reach the Server whereas subsequent messages
1113    /// had succeeded, might still be discarded after a server-side timeout, in order to ensure that
1114    /// the listener eventually gets a notification.
1115    ///
1116    /// Moreover, if "UNORDERED_MESSAGES" is used and no listener is supplied, a "fire and forget"
1117    /// scenario is assumed. In this case, no checks on missing, duplicated or overtaken messages
1118    /// are performed at all, so as to optimize the processing and allow the highest possible throughput.
1119    ///
1120    /// Since a message is handled by the Metadata Adapter associated to the current connection, a
1121    /// message can be sent only if a connection is currently active. If the special `enqueueWhileDisconnected`
1122    /// flag is specified it is possible to call the method at any time and the client will take
1123    /// care of sending the message as soon as a connection is available, otherwise, if the current
1124    /// status is "DISCONNECTED*", the message will be abandoned and the `ClientMessageListener.onAbort()`
1125    /// event will be fired.
1126    ///
1127    /// Note that, in any case, as soon as the status switches again to "DISCONNECTED*", any message
1128    /// still pending is aborted, including messages that were queued with the `enqueueWhileDisconnected`
1129    /// flag set to `true`.
1130    ///
1131    /// Also note that forwarding of the message to the server is made in a separate thread, hence,
1132    /// if a message is sent while the connection is active, it could be aborted because of a subsequent
1133    /// disconnection. In the same way a message sent while the connection is not active might be
1134    /// sent because of a subsequent connection.
1135    ///
1136    /// # Parameters
1137    ///
1138    /// * `message`: a text message, whose interpretation is entirely demanded to the Metadata Adapter
1139    ///   associated to the current connection.
1140    /// * `sequence`: an alphanumeric identifier, used to identify a subset of messages to be managed
1141    ///   in sequence; underscore characters are also allowed. If the "UNORDERED_MESSAGES" identifier
1142    ///   is supplied, the message will be processed in the special way described above. The parameter
1143    ///   is optional; if set to `None`, "UNORDERED_MESSAGES" is used as the sequence name.
1144    /// * `delay_timeout`: a timeout, expressed in milliseconds. If higher than the Server configured
1145    ///   timeout on missing messages, the latter will be used instead. The parameter is optional; if
1146    ///   a negative value is supplied, the Server configured timeout on missing messages will be applied.
1147    ///   This timeout is ignored for the special "UNORDERED_MESSAGES" sequence, although a server-side
1148    ///   timeout on missing messages still applies.
1149    /// * `listener`: an object suitable for receiving notifications about the processing outcome. The
1150    ///   parameter is optional; if not supplied, no notification will be available.
1151    /// * `enqueue_while_disconnected`: if this flag is set to `true`, and the client is in a disconnected
1152    ///   status when the provided message is handled, then the message is not aborted right away but
1153    ///   is queued waiting for a new session. Note that the message can still be aborted later when
1154    ///   a new session is established.
1155    pub fn send_message(
1156        &mut self,
1157        message: &str,
1158        sequence: Option<&str>,
1159        _delay_timeout: Option<u64>,
1160        listener: Option<Box<dyn ClientMessageListener>>,
1161        enqueue_while_disconnected: bool,
1162    ) {
1163        let _sequence = sequence.unwrap_or_else(|| "UNORDERED_MESSAGES");
1164
1165        // Handle the message based on the current connection status
1166        match &self.status {
1167            ClientStatus::Connected(_connection_type) => {
1168                // Send the message to the server in a separate thread
1169                // ...
1170            }
1171            ClientStatus::Disconnected(_disconnection_type) => {
1172                if enqueue_while_disconnected {
1173                    // Enqueue the message to be sent when a connection is available
1174                    // ...
1175                } else {
1176                    // Abort the message and notify the listener
1177                    if let Some(listener) = listener {
1178                        listener.on_abort(message, false);
1179                    }
1180                }
1181            }
1182            _ => {
1183                // Enqueue the message to be sent when a connection is available
1184                // ...
1185            }
1186        }
1187        unimplemented!("Complete mechanism to send message to LightstreamerClient.");
1188    }
1189
1190    /// Static method that permits to configure the logging system used by the library. The logging
1191    /// system must respect the `LoggerProvider` interface. A custom class can be used to wrap any
1192    /// third-party logging system.
1193    ///
1194    /// If no logging system is specified, all the generated log is discarded.
1195    ///
1196    /// The following categories are available to be consumed:
1197    ///
1198    /// - `lightstreamer.stream`: logs socket activity on Lightstreamer Server connections; at INFO
1199    ///   level, socket operations are logged; at DEBUG level, read/write data exchange is logged.
1200    /// - `lightstreamer.protocol`: logs requests to Lightstreamer Server and Server answers; at INFO
1201    ///   level, requests are logged; at DEBUG level, request details and events from the Server are logged.
1202    /// - `lightstreamer.session`: logs Server Session lifecycle events; at INFO level, lifecycle events
1203    ///   are logged; at DEBUG level, lifecycle event details are logged.
1204    /// - `lightstreamer.subscriptions`: logs subscription requests received by the clients and the related
1205    ///   updates; at WARN level, alert events from the Server are logged; at INFO level, subscriptions
1206    ///   and unsubscriptions are logged; at DEBUG level, requests batching and update details are logged.
1207    /// - `lightstreamer.actions`: logs settings / API calls.
1208    ///
1209    /// # Parameters
1210    ///
1211    /// * `provider`: A `LoggerProvider` instance that will be used to generate log messages by the
1212    ///   library classes.
1213    pub fn set_logger_provider() {
1214        unimplemented!("Implement mechanism to set logger provider for LightstreamerClient.");
1215    }
1216    /*
1217    pub fn set_logger_provider(provider: LoggerProvider) {
1218        // Implementation for set_logger_provider
1219    }
1220    */
1221
1222    /// Provides a mean to control the way TLS certificates are evaluated, with the possibility to
1223    /// accept untrusted ones.
1224    ///
1225    /// May be called only once before creating any `LightstreamerClient` instance.
1226    ///
1227    /// # Parameters
1228    ///
1229    /// * `factory`: an instance of `ssl.SSLContext`
1230    ///
1231    /// # Raises
1232    ///
1233    /// * `IllegalArgumentException`: if the factory is `None`
1234    /// * `IllegalStateException`: if a factory is already installed
1235    pub fn set_trust_manager_factory() {
1236        unimplemented!("Implement mechanism to set trust manager factory for LightstreamerClient.");
1237    }
1238    /*
1239    pub fn set_trust_manager_factory(factory: Option<SslContext>) -> Result<(), IllegalArgumentException> {
1240        if factory.is_none() {
1241            return Err(IllegalArgumentException::new(
1242                "Factory cannot be None",
1243            ));
1244        }
1245
1246        // Implementation for set_trust_manager_factory
1247        Ok(())
1248    }
1249    */
1250
1251    /// Adds a subscription to the `LightstreamerClient` instance.
1252    ///
1253    /// Active subscriptions are subscribed to through the server as soon as possible (i.e. as soon
1254    /// as there is a session available). Active `Subscription` are automatically persisted across different
1255    /// sessions as long as a related unsubscribe call is not issued.
1256    ///
1257    /// Subscriptions can be given to the `LightstreamerClient` at any time. Once done the `Subscription`
1258    /// immediately enters the "active" state.
1259    ///
1260    /// Once "active", a `Subscription` instance cannot be provided again to a `LightstreamerClient`
1261    /// unless it is first removed from the "active" state through a call to `unsubscribe()`.
1262    ///
1263    /// Also note that forwarding of the subscription to the server is made in a separate thread.
1264    ///
1265    /// A successful subscription to the server will be notified through a `SubscriptionListener.onSubscription()`
1266    /// event.
1267    ///
1268    /// # Parameters
1269    ///
1270    /// * `subsrciption_sender`: A `Sender` object that sends a `SubscriptionRequest` to the `LightstreamerClient`
1271    /// * `subscription`: A `Subscription` object, carrying all the information needed to process real-time
1272    ///   values.
1273    ///
1274    /// See also `unsubscribe()`
1275    pub fn subscribe(subscription_sender: Sender<SubscriptionRequest>, subscription: Subscription) {
1276        subscription_sender
1277            .try_send(SubscriptionRequest {
1278                subscription: Some(subscription),
1279                subscription_id: None,
1280            })
1281            .unwrap()
1282    }
1283
1284    /// If you want to be able to unsubscribe from a subscription, you need to keep track of the id
1285    /// of the subscription. This blocking method allows you to wait for the id of the subscription
1286    /// to be returned.
1287    /// 
1288    /// # Parameters
1289    /// 
1290    /// * `subscription_sender`: A `Sender` object that sends a `SubscriptionRequest` to the `LightstreamerClient`
1291    /// * `subscription`: A `Subscription` object, carrying all the information needed to process real-time
1292    /// 
1293    pub async fn subscribe_get_id(
1294        subscription_sender: Sender<SubscriptionRequest>,
1295        subscription: Subscription,
1296    ) -> Result<usize, Box<dyn Error + Send + Sync>> {
1297        let mut id_receiver = subscription.id_receiver.clone();
1298        LightstreamerClient::subscribe(subscription_sender.clone(), subscription);
1299
1300        match id_receiver.changed().await {
1301            Ok(_) => Ok(*id_receiver.borrow()),
1302            Err(_) => Err(Box::new(IllegalStateException::new(
1303                "Failed to get subscription id",
1304            ))),
1305        }
1306    }
1307
1308    /// Operation method that removes a `Subscription` that is currently in the "active" state.
1309    ///
1310    /// By bringing back a `Subscription` to the "inactive" state, the unsubscription from all its
1311    /// items is requested to Lightstreamer Server.
1312    ///
1313    /// Subscription can be unsubscribed from at any time. Once done the `Subscription` immediately
1314    /// exits the "active" state.
1315    ///
1316    /// Note that forwarding of the unsubscription to the server is made in a separate thread.
1317    ///
1318    /// The unsubscription will be notified through a `SubscriptionListener.onUnsubscription()` event.
1319    ///
1320    /// # Parameters
1321    ///
1322    /// * `subsrciption_sender`: A `Sender` object that sends a `SubscriptionRequest` to the `LightstreamerClient`
1323    /// * `subscription_id`: The id of the subscription to be unsubscribed from.
1324    ///   instance.
1325    pub fn unsubscribe(
1326        subscription_sender: Sender<SubscriptionRequest>,
1327        subscription_id: usize,
1328    ) {
1329        println!("Unsubscribing subscription with id: {}", subscription_id);
1330        subscription_sender
1331            .try_send(SubscriptionRequest {
1332                subscription: None,
1333                subscription_id: Some(subscription_id),
1334            })
1335            .unwrap()
1336    }
1337
1338    /// Method setting enum for the logging of this instance.
1339    ///
1340    /// Default logging type is StdLogs, corresponding to `stdout`
1341    ///
1342    /// `LightstreamerClient` has methods for logging that are compatible with the `Tracing` crate.
1343    /// Enabling logging for the `Tracing` crate requires implementation of a tracing subscriber
1344    /// and its configuration and formatting.
1345    ///
1346    /// # Parameters
1347    ///
1348    /// * `logging`: An enum declaring the logging type of this `LightstreamerClient` instance.
1349    pub fn set_logging_type(&mut self, logging: LogType) {
1350        self.logging = logging;
1351    }
1352
1353    /// Method for logging messages
1354    ///
1355    /// Match case wraps log types. `loglevel` param ignored in StdLogs case, all output to stdout.
1356    ///
1357    /// # Parameters
1358    ///
1359    /// * `loglevel` Enum determining use of stdout or Tracing subscriber.
1360    pub fn make_log(&mut self, loglevel: Level, log: &str) {
1361        match self.logging {
1362            LogType::StdLogs => {
1363                println!("{}", log);
1364            }
1365            LogType::TracingLogs => match loglevel {
1366                Level::INFO => {
1367                    info!(log);
1368                }
1369                Level::WARN => {
1370                    warn!(log);
1371                }
1372                Level::ERROR => {
1373                    error!(log);
1374                }
1375                Level::TRACE => {
1376                    trace!(log);
1377                }
1378                Level::DEBUG => {
1379                    debug!(log);
1380                }
1381            },
1382        }
1383    }
1384}
1385
1386/// The transport type to be used by the client.
1387/// - WS: the Stream-Sense algorithm is enabled as in the `None` case but the client will
1388///   only use WebSocket based connections. If a connection over WebSocket is not possible
1389///   because of the environment the client will not connect at all.
1390/// - HTTP: the Stream-Sense algorithm is enabled as in the `None` case but the client
1391///   will only use HTTP based connections. If a connection over HTTP is not possible because
1392///   of the environment the client will not connect at all.
1393/// - WS-STREAMING: the Stream-Sense algorithm is disabled and the client will only connect
1394///   on Streaming over WebSocket. If Streaming over WebSocket is not possible because of
1395///   the environment the client will not connect at all.
1396/// - HTTP-STREAMING: the Stream-Sense algorithm is disabled and the client will only
1397///   connect on Streaming over HTTP. If Streaming over HTTP is not possible because of the
1398///   browser/environment the client will not connect at all.
1399/// - WS-POLLING: the Stream-Sense algorithm is disabled and the client will only connect
1400///   on Polling over WebSocket. If Polling over WebSocket is not possible because of the
1401///   environment the client will not connect at all.
1402/// - HTTP-POLLING: the Stream-Sense algorithm is disabled and the client will only connect
1403///   on Polling over HTTP. If Polling over HTTP is not possible because of the environment
1404///   the client will not connect at all.
1405#[derive(Debug, PartialEq)]
1406pub enum Transport {
1407    Ws,
1408    Http,
1409    WsStreaming,
1410    HttpStreaming,
1411    WsPolling,
1412    HttpPolling,
1413}