lightstreamer_rs/client/implementation.rs
1use crate::subscription::{ItemUpdate, Snapshot, Subscription, SubscriptionMode};
2
3use crate::client::Transport;
4pub(crate) use crate::client::listener::ClientListener;
5use crate::client::message_listener::ClientMessageListener;
6use crate::client::model::{ClientStatus, DisconnectionType, LogType};
7use crate::client::request::SubscriptionRequest;
8use crate::client::utils::get_subscription_by_id;
9use crate::connection::{ConnectionDetails, ConnectionOptions};
10use crate::connection::{ConnectionManager, ConnectionState, HeartbeatConfig, ReconnectionConfig};
11use crate::utils::{LightstreamerError, clean_message, parse_arguments};
12use cookie::Cookie;
13use futures_util::{SinkExt, StreamExt};
14use std::collections::HashMap;
15use std::fmt::{self, Debug, Formatter};
16use std::sync::Arc;
17use std::time::Duration;
18use tokio::sync::mpsc::channel;
19use tokio::sync::{
20 Mutex, Notify,
21 mpsc::{Receiver, Sender},
22};
23use tokio::time::Instant as TokioInstant;
24use tokio_tungstenite::{
25 connect_async,
26 tungstenite::{
27 Message,
28 http::{HeaderName, HeaderValue, Request},
29 },
30};
31use tracing::{Level, debug, error, info, instrument, trace, warn};
32use url::Url;
33
34/// Facade class for the management of the communication to Lightstreamer Server. Used to provide
35/// configuration settings, event handlers, operations for the control of the connection lifecycle,
36/// Subscription handling and to send messages.
37///
38/// An instance of `LightstreamerClient` handles the communication with Lightstreamer Server on a
39/// specified endpoint. Hence, it hosts one "Session"; or, more precisely, a sequence of Sessions,
40/// since any Session may fail and be recovered, or it can be interrupted on purpose. So, normally,
41/// a single instance of `LightstreamerClient` is needed.
42///
43/// However, multiple instances of `LightstreamerClient` can be used, toward the same or multiple
44/// endpoints.
45///
46/// You can listen to the events generated by a session by registering an event listener, such as
47/// `ClientListener` or `SubscriptionListener`. These listeners allow you to handle various events,
48/// such as session creation, connection status, subscription updates, and server messages. However,
49/// you should be aware that the event notifications are dispatched by a single thread, the so-called
50/// event thread. This means that if the operations of a listener are slow or blocking, they will
51/// delay the processing of the other listeners and affect the performance of your application.
52/// Therefore, you should delegate any slow or blocking operations to a dedicated thread, and keep
53/// the listener methods as fast and simple as possible. Note that even if you create multiple
54/// instances of `LightstreamerClient`, they will all use a single event thread, that is shared
55/// among them.
56///
57/// # Parameters
58///
59/// * `server_address`: the address of the Lightstreamer Server to which this `LightstreamerClient`
60/// will connect to. It is possible to specify it later by using `None` here. See
61/// `ConnectionDetails.setServerAddress()` for details.
62/// * `adapter_set`: the name of the Adapter Set mounted on Lightstreamer Server to be used to handle
63/// all requests in the Session associated with this `LightstreamerClient`. It is possible not to
64/// specify it at all or to specify it later by using `None` here. See `ConnectionDetails.setAdapterSet()`
65/// for details.
66///
67/// # Raises
68///
69/// * `IllegalArgumentException`: if a not valid address is passed. See `ConnectionDetails.setServerAddress()`
70/// for details.
71pub struct LightstreamerClient {
72 /// The address of the Lightstreamer Server to which this `LightstreamerClient` will connect.
73 server_address: Option<String>,
74 /// The name of the Adapter Set mounted on Lightstreamer Server to be used to handle all
75 /// requests in the Session associated with this `LightstreamerClient`.
76 adapter_set: Option<String>,
77 /// Data object that contains the details needed to open a connection to a Lightstreamer Server.
78 /// This instance is set up by the `LightstreamerClient` object at its own creation. Properties
79 /// of this object can be overwritten by values received from a Lightstreamer Server.
80 pub connection_details: ConnectionDetails,
81 /// Data object that contains options and policies for the connection to the server. This instance
82 /// is set up by the `LightstreamerClient` object at its own creation. Properties of this object
83 /// can be overwritten by values received from a Lightstreamer Server.
84 pub connection_options: ConnectionOptions,
85 /// A list of listeners that will receive events from the `LightstreamerClient` instance.
86 listeners: Vec<Box<dyn ClientListener>>,
87 /// A list containing all the `Subscription` instances that are currently "active" on this
88 /// `LightstreamerClient`.
89 subscriptions: Vec<Subscription>,
90 /// The current status of the client.
91 status: ClientStatus,
92 /// Logging Type to be used
93 logging: LogType,
94 /// The sender that can be used to subscribe/unsubscribe
95 pub subscription_sender: Sender<SubscriptionRequest>,
96 /// The receiver used for subscribe/unsubsribe
97 subscription_receiver: Receiver<SubscriptionRequest>,
98 /// Connection manager for automatic reconnection
99 connection_manager: Option<Arc<ConnectionManager>>,
100 /// Configuration for reconnection behavior
101 reconnection_config: ReconnectionConfig,
102 /// Configuration for heartbeat monitoring
103 heartbeat_config: HeartbeatConfig,
104 /// Whether automatic reconnection is enabled
105 auto_reconnect_enabled: bool,
106}
107
108impl Debug for LightstreamerClient {
109 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
110 f.debug_struct("LightstreamerClient")
111 .field("server_address", &self.server_address)
112 .field("adapter_set", &self.adapter_set)
113 .field("connection_details", &self.connection_details)
114 .field("connection_options", &self.connection_options)
115 .field("listeners", &self.listeners)
116 .field("subscriptions", &self.subscriptions)
117 .finish()
118 }
119}
120
121impl LightstreamerClient {
122 /// A constant string representing the name of the library.
123 pub const LIB_NAME: &'static str = "rust_client";
124
125 /// A constant string representing the version of the library.
126 pub const LIB_VERSION: &'static str = "0.1.0";
127
128 //
129 // Constants for WebSocket connection.
130 //
131 /// WebSocket key used in the WebSocket handshake process.
132 /// This is a base64-encoded value that is used to establish the WebSocket connection.
133 pub const SEC_WEBSOCKET_KEY: &'static str = "PNDUibe9ex7PnsrLbt0N4w==";
134
135 /// WebSocket protocol identifier for the TLCP protocol used by Lightstreamer.
136 /// This identifies the specific subprotocol used over the WebSocket connection.
137 pub const SEC_WEBSOCKET_PROTOCOL: &'static str = "TLCP-2.4.0.lightstreamer.com";
138
139 /// WebSocket version used for the connection.
140 /// Version 13 is the standard version used in modern WebSocket implementations.
141 pub const SEC_WEBSOCKET_VERSION: &'static str = "13";
142
143 /// WebSocket upgrade header value.
144 /// Used to indicate that the client wishes to upgrade from HTTP to WebSocket protocol.
145 pub const SEC_WEBSOCKET_UPGRADE: &'static str = "websocket";
146
147 /// A constant string representing the version of the TLCP protocol used by the library.
148 pub const TLCP_VERSION: &'static str = "TLCP-2.4.0";
149
150 /// Grace period (milliseconds) added on top of the keepalive interval before
151 /// the connection is considered dead by the liveness watchdog.
152 ///
153 /// The server guarantees a message (data or `probe`) at least every keepalive
154 /// interval; the grace absorbs network jitter and scheduling delays.
155 pub const KEEPALIVE_GRACE_MS: u64 = 5_000;
156
157 /// Static method that can be used to share cookies between connections to the Server (performed by
158 /// this library) and connections to other sites that are performed by the application. With this
159 /// method, cookies received by the application can be added (or replaced if already present) to
160 /// the cookie set used by the library to access the Server. Obviously, only cookies whose domain
161 /// is compatible with the Server domain will be used internally.
162 ///
163 /// This method should be invoked before calling the `LightstreamerClient.connect()` method.
164 /// However it can be invoked at any time; it will affect the internal cookie set immediately
165 /// and the sending of cookies on the next HTTP request or WebSocket establishment.
166 ///
167 /// # Parameters
168 ///
169 /// * `uri`: the URI from which the supplied cookies were received. It cannot be `None`.
170 /// * `cookies`: an instance of `http.cookies.SimpleCookie`.
171 ///
172 /// See also `getCookies()`
173 pub fn add_cookies(_uri: &str, _cookies: &Cookie) {
174 // Implementation for add_cookies
175 unimplemented!("Implement mechanism to add cookies to LightstreamerClient");
176 }
177
178 /// Adds a listener that will receive events from the `LightstreamerClient` instance.
179 ///
180 /// The same listener can be added to several different `LightstreamerClient` instances.
181 ///
182 /// A listener can be added at any time. A call to add a listener already present will be ignored.
183 ///
184 /// # Parameters
185 ///
186 /// * `listener`: An object that will receive the events as documented in the `ClientListener`
187 /// interface.
188 ///
189 /// See also `removeListener()`
190 pub fn add_listener(&mut self, listener: Box<dyn ClientListener>) {
191 self.listeners.push(listener);
192 }
193
194 /// Packs s string with the necessary parameters for a subscription request.
195 ///
196 /// # Parameters
197 ///
198 /// * `subscription`: The subscription for which to get the parameters.
199 /// * `request_id`: The request ID to use in the parameters.
200 ///
201 fn get_subscription_params(
202 subscription: &Subscription,
203 request_id: usize,
204 ) -> Result<String, LightstreamerError> {
205 let ls_req_id = request_id.to_string();
206 let ls_sub_id = subscription.id.to_string();
207 let ls_mode = subscription.get_mode().to_string();
208 let ls_group = match subscription.get_item_group() {
209 Some(item_group) => item_group.to_string(),
210 None => match subscription.get_items() {
211 Some(items) => items.join(" "),
212 None => {
213 return Err(LightstreamerError::invalid_argument(
214 "No item group or items found in subscription.",
215 ));
216 }
217 },
218 };
219 let ls_schema = match subscription.get_field_schema() {
220 Some(field_schema) => field_schema.to_string(),
221 None => match subscription.get_fields() {
222 Some(fields) => fields.join(" "),
223 None => {
224 return Err(LightstreamerError::invalid_argument(
225 "No field schema or fields found in subscription.",
226 ));
227 }
228 },
229 };
230 let ls_data_adapter = match subscription.get_data_adapter() {
231 Some(data_adapter) => data_adapter.to_string(),
232 None => "".to_string(),
233 };
234 let ls_snapshot = subscription
235 .get_requested_snapshot()
236 .unwrap_or_default()
237 .to_string();
238 //
239 // Prepare the subscription request.
240 //
241 let mut params: Vec<(&str, &str)> = vec![
242 ("LS_data_adapter", &ls_data_adapter),
243 ("LS_reqId", &ls_req_id),
244 ("LS_op", "add"),
245 ("LS_subId", &ls_sub_id),
246 ("LS_mode", &ls_mode),
247 ("LS_group", &ls_group),
248 ("LS_schema", &ls_schema),
249 ("LS_ack", "false"),
250 ];
251 // Remove the data adapter parameter if not specified.
252 if ls_data_adapter.is_empty() {
253 params.remove(0);
254 }
255 if !ls_snapshot.is_empty() {
256 params.push(("LS_snapshot", &ls_snapshot));
257 }
258
259 Ok(serde_urlencoded::to_string(¶ms)?)
260 }
261
262 fn get_unsubscription_params(
263 subscription_id: usize,
264 request_id: usize,
265 ) -> Result<String, LightstreamerError> {
266 let ls_req_id = request_id.to_string();
267 let ls_sub_id = subscription_id.to_string();
268 //
269 // Prepare the unsubscription request.
270 //
271 let params: Vec<(&str, &str)> = vec![
272 ("LS_reqId", &ls_req_id),
273 ("LS_op", "delete"),
274 ("LS_subId", &ls_sub_id),
275 ];
276
277 Ok(serde_urlencoded::to_string(¶ms)?)
278 }
279
280 /// Operation method that requests to open a Session against the configured Lightstreamer Server.
281 ///
282 /// When `connect()` is called, unless a single transport was forced through `ConnectionOptions.setForcedTransport()`,
283 /// the so called "Stream-Sense" mechanism is started: if the client does not receive any answer
284 /// for some seconds from the streaming connection, then it will automatically open a polling
285 /// connection.
286 ///
287 /// A polling connection may also be opened if the environment is not suitable for a streaming
288 /// connection.
289 ///
290 /// Note that as "polling connection" we mean a loop of polling requests, each of which requires
291 /// opening a synchronous (i.e. not streaming) connection to Lightstreamer Server.
292 ///
293 /// Note that the request to connect is accomplished by the client in a separate thread; this
294 /// means that an invocation to `getStatus()` right after `connect()` might not reflect the change
295 /// yet.
296 ///
297 /// When the request to connect is finally being executed, if the current status of the client
298 /// is not `DISCONNECTED`, then nothing will be done.
299 ///
300 /// # Raises
301 ///
302 /// * `IllegalStateException`: if no server address was configured.
303 ///
304 /// See also `getStatus()`
305 ///
306 /// See also `disconnect()`
307 ///
308 /// See also `ClientListener.onStatusChange()`
309 ///
310 /// See also `ConnectionDetails.setServerAddress()`
311 #[instrument(level = "trace")]
312 pub async fn connect(
313 client: Arc<Mutex<Self>>,
314 shutdown_signal: Arc<Notify>,
315 ) -> Result<(), LightstreamerError> {
316 // If auto-reconnect is enabled, use ConnectionManager
317 let auto_reconnect_enabled = {
318 let client_guard = client.lock().await;
319 client_guard.auto_reconnect_enabled
320 };
321
322 if auto_reconnect_enabled {
323 let (reconnection_config, heartbeat_config) = {
324 let client_guard = client.lock().await;
325 (
326 client_guard.reconnection_config.clone(),
327 client_guard.heartbeat_config.clone(),
328 )
329 };
330
331 let weak_client = Arc::downgrade(&client);
332 let connection_manager: Arc<ConnectionManager> =
333 ConnectionManager::new(weak_client, reconnection_config, heartbeat_config);
334
335 {
336 let mut client_guard = client.lock().await;
337 client_guard.connection_manager = Some(connection_manager.clone());
338 }
339
340 // Use direct connection method
341 return client.lock().await.connect_direct(shutdown_signal).await;
342 }
343
344 // Fallback to direct connection without auto-reconnect
345 let mut client_guard = client.lock().await;
346 client_guard.connect_direct(shutdown_signal).await
347 }
348
349 /// Direct connection method without automatic reconnection
350 #[instrument(level = "trace")]
351 pub async fn connect_direct(
352 &mut self,
353 shutdown_signal: Arc<Notify>,
354 ) -> Result<(), LightstreamerError> {
355 // Check if the server address is configured.
356 if self.server_address.is_none() {
357 return Err(LightstreamerError::invalid_state(
358 "No server address was configured.",
359 ));
360 }
361 //
362 // Only WebSocket streaming transport is currently supported.
363 //
364 let forced_transport = self.connection_options.get_forced_transport();
365 if forced_transport.is_none_or(|t| *t != Transport::WsStreaming) {
366 return Err(LightstreamerError::invalid_state(
367 "Only WebSocket streaming transport is currently supported.",
368 ));
369 }
370 //
371 // Convert the HTTP URL to a WebSocket URL.
372 //
373 let http_url = self
374 .connection_details
375 .get_server_address()
376 .ok_or_else(|| LightstreamerError::invalid_state("Server address is not set."))?;
377 let mut url = Url::parse(http_url).map_err(|e| {
378 LightstreamerError::invalid_state(format!("Failed to parse server address URL: {}", e))
379 })?;
380 match url.scheme() {
381 "http" => url.set_scheme("ws").map_err(|()| {
382 LightstreamerError::invalid_state("Failed to set scheme to ws for WebSocket URL.")
383 })?,
384 "https" => url.set_scheme("wss").map_err(|()| {
385 LightstreamerError::invalid_state("Failed to set scheme to wss for WebSocket URL.")
386 })?,
387 invalid_scheme => {
388 return Err(LightstreamerError::invalid_state(format!(
389 "Unsupported scheme '{}' found when converting HTTP URL to WebSocket URL.",
390 invalid_scheme
391 )));
392 }
393 }
394 let ws_url = url.as_str();
395
396 // Build the WebSocket request with the necessary headers.
397 let request = Request::builder()
398 .uri(ws_url)
399 .header(
400 HeaderName::from_static("connection"),
401 HeaderValue::from_static("Upgrade"),
402 )
403 .header(
404 HeaderName::from_static("host"),
405 HeaderValue::from_str(url.host_str().unwrap_or("localhost")).map_err(|err| {
406 LightstreamerError::invalid_state(format!(
407 "Invalid header value for header with name 'host': {}",
408 err
409 ))
410 })?,
411 )
412 .header(
413 HeaderName::from_static("sec-websocket-key"),
414 HeaderValue::from_static(Self::SEC_WEBSOCKET_KEY),
415 )
416 .header(
417 HeaderName::from_static("sec-websocket-protocol"),
418 HeaderValue::from_static(Self::SEC_WEBSOCKET_PROTOCOL),
419 )
420 .header(
421 HeaderName::from_static("sec-websocket-version"),
422 HeaderValue::from_static(Self::SEC_WEBSOCKET_VERSION),
423 )
424 .header(
425 HeaderName::from_static("upgrade"),
426 HeaderValue::from_static(Self::SEC_WEBSOCKET_UPGRADE),
427 )
428 .body(())?;
429
430 // Connect to the Lightstreamer server using WebSocket.
431 let ws_stream = match connect_async(request).await {
432 Ok((ws_stream, response)) => {
433 if let Some(server_header) = response.headers().get("server") {
434 self.make_log(
435 Level::INFO,
436 &format!(
437 "Connected to Lightstreamer server: {}",
438 server_header.to_str().unwrap_or("")
439 ),
440 );
441 } else {
442 self.make_log(Level::INFO, "Connected to Lightstreamer server");
443 }
444 ws_stream
445 }
446 Err(err) => {
447 return Err(LightstreamerError::connection(format!(
448 "Failed to connect to Lightstreamer server with WebSocket: {}",
449 err
450 )));
451 }
452 };
453
454 // Split the WebSocket stream into a write and a read stream.
455 let (mut write_stream, mut read_stream) = ws_stream.split();
456
457 //
458 // Initiate communication with the server by sending a 'wsok' message.
459 //
460 write_stream.send(Message::Text("wsok".into())).await?;
461
462 //
463 // Start reading and processing messages from the server.
464 //
465 let mut is_connected = false;
466 let mut request_id: usize = 0;
467 let mut session_id: Option<String> = None;
468 let mut subscription_id: usize = 0;
469 let mut subscription_item_updates: HashMap<usize, HashMap<usize, ItemUpdate>> =
470 HashMap::new();
471
472 //
473 // Liveness enforcement and reverse heartbeat.
474 //
475 // The server guarantees that some message (data or `probe`) is sent at
476 // least every keepalive interval. If nothing arrives within the
477 // effective keepalive window plus a grace period, the connection is
478 // considered dead (half-open TCP) and an error is returned so the
479 // caller can reconnect. The window starts from the configured
480 // keepalive interval (if any) and is widened to the server-declared
481 // keepalive from the `conok` message once known.
482 //
483 let configured_keepalive_ms = self.connection_options.get_keepalive_interval();
484 let reverse_heartbeat_ms = self.connection_options.get_reverse_heartbeat_interval();
485 let mut keepalive_timeout_ms: u64 = configured_keepalive_ms;
486 let mut last_message_at = TokioInstant::now();
487
488 let mut heartbeat_timer = if reverse_heartbeat_ms > 0 {
489 let mut timer = tokio::time::interval(Duration::from_millis(reverse_heartbeat_ms));
490 timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
491 Some(timer)
492 } else {
493 None
494 };
495
496 loop {
497 let liveness_deadline = last_message_at
498 + Duration::from_millis(
499 keepalive_timeout_ms.saturating_add(Self::KEEPALIVE_GRACE_MS),
500 );
501 tokio::select! {
502 message = read_stream.next() => {
503 last_message_at = TokioInstant::now();
504 match message {
505 Some(Ok(Message::Text(text))) => {
506 // Messages could include multiple submessages separated by /r/n.
507 // Split the message into submessages and process each one separately.
508 let submessages: Vec<&str> = text.split("\r\n")
509 .filter(|&line| !line.trim().is_empty()) // Filter out empty lines.
510 .collect();
511 for submessage in submessages {
512 let clean_text = clean_message(submessage);
513 let submessage_fields: Vec<&str> = clean_text.split(",").collect();
514 match *submessage_fields.first().unwrap_or(&"") {
515 //
516 // Errors from server.
517 //
518 //
519 // Session-level error: the session is not usable — fail the
520 // connection so the caller can reconnect. (The previous
521 // `break` only exited the submessage loop, leaving a dead
522 // session looping forever.)
523 //
524 "conerr" => {
525 self.make_log( Level::ERROR, &format!("Received connection error from Lightstreamer server: {}", clean_text) );
526 return Err(LightstreamerError::connection(format!(
527 "connection error from server: {}",
528 clean_text
529 )));
530 },
531 //
532 // Request-level error: the session survives — log and continue.
533 //
534 "reqerr" => {
535 self.make_log( Level::ERROR, &format!("Received request error from Lightstreamer server: {}", clean_text) );
536 },
537 //
538 // Session created successfully.
539 //
540 "conok" => {
541 is_connected = true;
542 // CONOK,<session-ID>,<request-limit>,<keep-alive>,<control-link>
543 // Widen the liveness window to the server-declared keepalive
544 // so enforcement works even when no keepalive was configured.
545 if let Some(server_keepalive_ms) = submessage_fields
546 .get(3)
547 .and_then(|v| v.trim().parse::<u64>().ok())
548 .filter(|v| *v > 0)
549 {
550 keepalive_timeout_ms = widen_keepalive_timeout(
551 keepalive_timeout_ms,
552 server_keepalive_ms,
553 );
554 self.make_log( Level::DEBUG, &format!("Server-declared keepalive interval: {} ms", server_keepalive_ms) );
555 }
556 if let Some(conok_session_id) = submessage_fields.get(1) {
557 session_id = Some((*conok_session_id).to_string());
558 self.make_log( Level::DEBUG, &format!("Session creation confirmed by server: {}", clean_text) );
559 self.make_log( Level::DEBUG, &format!("Session created with ID: {:?}", conok_session_id) );
560 //
561 // Subscribe to the desired items.
562 //
563 while let Some(subscription) = self.subscriptions.get_mut(subscription_id) {
564 //
565 // Gather all the necessary subscription parameters.
566 //
567 subscription_id += 1;
568 request_id += 1;
569 subscription.id = subscription_id;
570 subscription.id_sender.try_send(subscription_id)?;
571
572 let encoded_params = match Self::get_subscription_params(subscription, request_id)
573 {
574 Ok(params) => params,
575 Err(err) => {
576 return Err(err);
577 },
578 };
579
580 write_stream
581 .send(Message::Text(format!("control\r\n{}", encoded_params).into()))
582 .await?;
583 debug!("Sent subscription request: '{}'", encoded_params);
584 }
585 } else {
586 return Err(LightstreamerError::protocol(
587 "Session ID not found in 'conok' message from server",
588 ));
589 }
590 },
591 //
592 // Notifications from server.
593 //
594 "conf" | "cons" | "clientip" | "servname" | "prog" | "sync" | "eos" => {
595 self.make_log( Level::INFO, &format!("Received notification from server: {}", clean_text) );
596 // Don't do anything with these notifications for now.
597 },
598 "probe" => {
599 self.make_log( Level::DEBUG, &format!("Received probe message from server: {}", clean_text ) );
600 },
601 "reqok" => {
602 self.make_log( Level::DEBUG, &format!("Received reqok message from server: '{}'", clean_text ) );
603 },
604 //
605 // Subscription confirmation from server.
606 //
607 "subok" => {
608 self.make_log( Level::INFO, &format!("Subscription confirmed by server: '{}'", clean_text) );
609 },
610 //
611 // Usubscription confirmation from server.
612 //
613 "unsub" => {
614 self.make_log( Level::INFO, &format!("Unsubscription confirmed by server: '{}'", clean_text) );
615 },
616 //
617 // Data updates from server.
618 //
619 "u" => {
620 // Parse arguments from the received message.
621 let arguments = parse_arguments(&clean_text);
622 //
623 // Extract the subscription from the first argument.
624 //
625 let subscription_index = arguments.get(1).unwrap_or(&"").parse::<usize>().unwrap_or(0);
626 let subscription = match get_subscription_by_id(self.get_subscriptions(), subscription_index) {
627 Some(subscription) => subscription,
628 None => {
629 self.make_log( Level::WARN, &format!("Subscription not found for index: {}", subscription_index) );
630 continue;
631
632 }
633 };
634 //
635 // Extract the item from the second argument.
636 //
637 let item_index = arguments.get(2).unwrap_or(&"").parse::<usize>().unwrap_or(0);
638 let item = match subscription.get_items() {
639 Some(items) => items.get(item_index-1),
640 None => {
641 self.make_log( Level::WARN, &format!("No items found in subscription: {:?}", subscription) );
642 continue;
643 }
644 };
645 //
646 // Determine if the update is a snapshot or real-time update based on the subscription parameters.
647 //
648 let is_snapshot = match subscription.get_requested_snapshot() {
649 Some(ls_snapshot) => {
650 match ls_snapshot {
651 Snapshot::No => false,
652 Snapshot::Yes => {
653 match subscription.get_mode() {
654 SubscriptionMode::Merge => {
655 if arguments.len() == 4 && arguments[3] == "$" {
656 // EOS notification received
657 true
658 } else {
659 // If item doesn't exist in item_updates yet, the first update
660 // is always a snapshot.
661 if let Some(item_updates) = subscription_item_updates.get(&(subscription_index)) {
662 if item_updates.get(&(item_index)).is_some() {
663 // Item update already exists in item_updates, so it's not a snapshot.
664 false
665 } else {
666 // Item update doesn't exist in item_updates, so the first update is always a snapshot.
667 true
668 }
669 } else {
670 // Item updates not found for subscription, so the first update is always a snapshot.
671 true
672 }
673 }
674 },
675 SubscriptionMode::Distinct | SubscriptionMode::Command => {
676 !subscription.is_subscribed()
677 },
678 _ => false,
679 }
680 },
681 _ => false,
682 }
683 },
684 None => false,
685 };
686
687 // Extract the field values from the third argument.
688 let field_values: Vec<&str> = arguments.get(3).unwrap_or(&"").split('|').collect();
689
690 //
691 // Get fields from subscription and create a HashMap of field names and values.
692 //
693 let subscription_fields = subscription.get_fields();
694 let mut field_map: HashMap<String, Option<String>> = subscription_fields
695 .map(|fields| fields.iter().map(|field_name| (field_name.to_string(), None)).collect())
696 .unwrap_or_default();
697
698 let mut field_index = 0;
699 for value in field_values {
700 match value {
701 "" => {
702 // An empty value means the field is unchanged compared to the previous update of the same field.
703 if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
704 field_map.insert(field_name.to_string(), None);
705 }
706 field_index += 1;
707 }
708 "#" | "$" => {
709 // A value corresponding to a hash sign "#" or dollar sign "$" means the field is null or empty.
710 if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
711 field_map.insert(field_name.to_string(), Some("".to_string()));
712 }
713 field_index += 1;
714 }
715 value if value.starts_with('^') => {
716 let command = value.chars().nth(1).unwrap_or(' ');
717 match command {
718 '0'..='9' => {
719 let count = value[1..].parse().unwrap_or(0);
720 for i in 0..count {
721 if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index + i)) {
722 field_map.insert(field_name.to_string(), None);
723 }
724 }
725 field_index += count;
726 }
727 'P' | 'T' => {
728 let diff_value = serde_urlencoded::from_str(&value[2..]).unwrap_or_else(|_| value[2..].to_string());
729 if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index))
730 && let Some(prev_value) = field_map.get(field_name).and_then(|v| v.as_ref()) {
731 let new_value = match command {
732 'P' => {
733 // Apply JSON Patch
734 let patch: serde_json::Value = serde_json::from_str(&diff_value).unwrap_or(serde_json::Value::Null);
735 let mut prev_json: serde_json::Value = serde_json::from_str(prev_value).unwrap_or(serde_json::Value::Null);
736 let patch_operations: Vec<json_patch::PatchOperation> = serde_json::from_value(patch).unwrap_or_default();
737 if let Err(e) = json_patch::patch(&mut prev_json, &patch_operations) {
738 tracing::warn!("Failed to apply JSON patch: {}", e);
739 }
740 prev_json.to_string()
741 }
742 'T' => {
743 // Apply TLCP-diff
744 //tlcp_diff::apply_diff(prev_value, &diff_value).unwrap_or_else(|_| prev_value.to_string())
745 unimplemented!("Implement TLCP-diff");
746 }
747 _ => unreachable!(),
748 };
749 field_map.insert(field_name.to_string(), Some(new_value.to_string()));
750 }
751 field_index += 1;
752 }
753 _ => {
754 let decoded_value = serde_urlencoded::from_str(value).unwrap_or_else(|_| value.to_string());
755 if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
756 field_map.insert(field_name.to_string(), Some(decoded_value));
757 }
758 field_index += 1;
759 }
760 }
761 }
762 value if value.starts_with('{') => {
763 // in this case it is a json payload that we will let the consumer handle. In this case, it is important
764 // to preserve casing for parsing.
765 let original_json = parse_arguments(submessage).get(3).unwrap_or(&"").split('|').collect::<Vec<&str>>();
766 let mut payload = "";
767 for json in original_json.iter()
768 {
769 if json.is_empty() || *json == "#"
770 {
771 continue;
772 }
773
774 payload = json;
775 }
776
777 if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
778 field_map.insert(field_name.to_string(), Some(payload.to_string()));
779 }
780 field_index += 1;
781 }
782 _ => {
783 let decoded_value = serde_urlencoded::from_str(value).unwrap_or_else(|_| value.to_string());
784 if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
785 field_map.insert(field_name.to_string(), Some(decoded_value));
786 }
787 field_index += 1;
788 }
789 }
790 }
791
792 // Store only item_update's changed fields.
793 let changed_fields: HashMap<String, String> = field_map.iter()
794 .filter_map(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone())))
795 .collect();
796
797 //
798 // Take the proper item_update from item_updates and update it with changed fields.
799 // If the item_update doesn't exist yet, create a new one.
800 //
801 let current_item_update: ItemUpdate;
802 match subscription_item_updates.get_mut(&(subscription_index)) {
803 Some(item_updates) => match item_updates.get_mut(&(item_index)) {
804 Some(item_update) => {
805 //
806 // Iterate changed_fields and update existing item_update.fields assigning the new values.
807 //
808 for (field_name, new_value) in &changed_fields {
809 if item_update.fields.contains_key(field_name) {
810 item_update.fields.insert((*field_name).clone(), Some(new_value.clone()));
811 }
812 }
813 item_update.changed_fields = changed_fields.clone();
814 item_update.is_snapshot = is_snapshot;
815 current_item_update = item_update.clone();
816 },
817 None => {
818 // Create a new item_update and add it to item_updates.
819 let item_update = ItemUpdate {
820 item_name: item.cloned(),
821 item_pos: item_index,
822 fields: field_map.clone(),
823 changed_fields: changed_fields.clone(),
824 is_snapshot,
825 };
826 current_item_update = item_update.clone();
827 item_updates.insert(item_index, item_update);
828 }
829 },
830 None => {
831 // Create a new item_update and add it to item_updates.
832 let item_update = ItemUpdate {
833 item_name: item.cloned(),
834 item_pos: item_index,
835 fields: field_map,
836 changed_fields,
837 is_snapshot,
838 };
839 current_item_update = item_update.clone();
840 let mut item_updates = HashMap::new();
841 item_updates.insert(item_index, item_update);
842 subscription_item_updates.insert(subscription_index, item_updates);
843 }
844 };
845
846 // Get mutable subscription listeners directly.
847 let subscription_listeners = subscription.get_listeners();
848
849 // Iterate subscription listeners and call on_item_update for each listener.
850 for listener in subscription_listeners {
851 listener.on_item_update(¤t_item_update);
852 }
853 }
854 //
855 // Connection confirmation from server.
856 //
857 "wsok" => {
858 self.make_log( Level::INFO, &format!("Connection confirmed by server: '{}'", clean_text) );
859 //
860 // Request session creation.
861 //
862 let ls_adapter_set = match self.connection_details.get_adapter_set() {
863 Some(adapter_set) => adapter_set,
864 None => {
865 return Err(LightstreamerError::invalid_state(
866 "No adapter set found in connection details.",
867 ));
868 },
869 };
870 let ls_send_sync = self.connection_options.get_send_sync().to_string();
871 let ls_keepalive_millis = configured_keepalive_ms.to_string();
872 let ls_inactivity_millis = reverse_heartbeat_ms.to_string();
873 let mut params: Vec<(&str, &str)> = vec![
874 ("LS_adapter_set", ls_adapter_set),
875 ("LS_cid", "mgQkwtwdysogQz2BJ4Ji kOj2Bg"),
876 ("LS_send_sync", &ls_send_sync),
877 ];
878 if configured_keepalive_ms > 0 {
879 params.push(("LS_keepalive_millis", &ls_keepalive_millis));
880 }
881 if reverse_heartbeat_ms > 0 {
882 params.push(("LS_inactivity_millis", &ls_inactivity_millis));
883 }
884 if let Some(user) = &self.connection_details.get_user() {
885 params.push(("LS_user", user));
886 }
887 if let Some(password) = &self.connection_details.get_password() {
888 params.push(("LS_password", password));
889 }
890 params.push(("LS_protocol", Self::TLCP_VERSION));
891 let encoded_params = serde_urlencoded::to_string(¶ms)?;
892 write_stream
893 .send(Message::Text(format!("create_session\r\n{}\n", encoded_params).into()))
894 .await?;
895 self.make_log( Level::DEBUG, &format!("Sent create session request: '{}'", encoded_params) );
896 },
897 unexpected_message => {
898 return Err(LightstreamerError::protocol(format!(
899 "Unexpected message received from server: '{:?}' (full message: '{}')",
900 unexpected_message, clean_text
901 )));
902 },
903 }
904 }
905 },
906 Some(Ok(non_text_message)) => {
907 return Err(LightstreamerError::protocol(format!(
908 "Unexpected non-text message from server: {:?}",
909 non_text_message
910 )));
911 },
912 Some(Err(err)) => {
913 return Err(LightstreamerError::protocol(format!(
914 "Error reading message from server: {}",
915 err
916 )));
917 },
918 None => {
919 self.make_log( Level::DEBUG, "No more messages from server" );
920 break;
921 },
922 }
923 },
924 Some(subscription_request) = self.subscription_receiver.recv() => {
925 request_id += 1;
926 // Process subscription requests.
927 if let Some(subscription) = subscription_request.subscription
928 {
929 self.subscriptions.push(subscription);
930
931 // if we are not connected yet, we will subscribe later
932 if !is_connected {
933 continue;
934 }
935
936 subscription_id += 1;
937 // SAFETY: We just pushed a subscription, so last_mut() must return Some.
938 // Using debug_assert to catch invariant violations during development.
939 let sub = self.subscriptions.last_mut();
940 debug_assert!(sub.is_some(), "subscriptions.last_mut() returned None after push()");
941 if let Some(sub) = sub {
942 sub.id = subscription_id;
943 sub.id_sender.try_send(subscription_id)?;
944 }
945
946 // SAFETY: We just pushed a subscription, so last() must return Some.
947 // Extract encoded_params in a separate scope to avoid borrow across await.
948 let encoded_params = {
949 let last_sub = self.subscriptions.last();
950 debug_assert!(last_sub.is_some(), "subscriptions.last() returned None after push()");
951 let Some(last_sub) = last_sub else {
952 return Err(LightstreamerError::invalid_state(
953 "subscriptions.last() returned None immediately after push()"
954 ));
955 };
956 Self::get_subscription_params(last_sub, request_id)?
957 };
958
959 write_stream
960 .send(Message::Text(format!("control\r\n{}", encoded_params).into()))
961 .await?;
962
963 self.make_log( Level::INFO, &format!("Sent subscription request: '{}'", encoded_params) );
964 }
965 // Process unsubscription requests.
966 else if let Some(unsubscription_id) = subscription_request.subscription_id
967 {
968 let encoded_params = match Self::get_unsubscription_params(unsubscription_id, request_id)
969 {
970 Ok(params) => params,
971 Err(err) => {
972 return Err(err);
973 },
974 };
975
976 write_stream
977 .send(Message::Text(format!("control\r\n{}", encoded_params).into()))
978 .await?;
979
980 self.make_log( Level::INFO, &format!("Sent unsubscription request: '{}'", encoded_params) );
981
982 self.subscriptions.retain(|s| s.id != unsubscription_id);
983
984 if self.subscriptions.is_empty()
985 {
986 self.make_log( Level::INFO, "No more subscriptions, disconnecting" );
987 shutdown_signal.notify_one();
988 }
989 }
990 },
991 _ = shutdown_signal.notified() => {
992 self.make_log( Level::INFO, "Received shutdown signal" );
993 break;
994 },
995 //
996 // Liveness watchdog: nothing received within the keepalive
997 // window plus grace — the connection is half-open / dead.
998 //
999 _ = tokio::time::sleep_until(liveness_deadline), if keepalive_timeout_ms > 0 => {
1000 let idle_ms = keepalive_timeout_ms.saturating_add(Self::KEEPALIVE_GRACE_MS);
1001 self.make_log(
1002 Level::ERROR,
1003 &format!(
1004 "No message received from server within {} ms (keepalive + grace); connection considered dead",
1005 idle_ms
1006 ),
1007 );
1008 return Err(LightstreamerError::connection(format!(
1009 "no message received from server within {} ms (keepalive + grace)",
1010 idle_ms
1011 )));
1012 },
1013 //
1014 // Reverse heartbeat: prove client liveness to the server and
1015 // surface write errors promptly on a dead connection.
1016 //
1017 _ = async {
1018 match heartbeat_timer.as_mut() {
1019 Some(timer) => { timer.tick().await; },
1020 None => std::future::pending::<()>().await,
1021 }
1022 }, if is_connected && session_id.is_some() => {
1023 // TLCP requires every request to carry at least one
1024 // parameter — an empty body is rejected with
1025 // "error,67,Empty request unexpected".
1026 if let Some(id) = session_id.as_deref() {
1027 let params = serde_urlencoded::to_string([("LS_session", id)])?;
1028 write_stream
1029 .send(Message::Text(format!("heartbeat\r\n{}", params).into()))
1030 .await?;
1031 self.make_log(Level::DEBUG, "Sent reverse heartbeat to server");
1032 }
1033 },
1034 }
1035 }
1036
1037 Ok(())
1038 }
1039
1040 /// Operation method that requests to close the Session opened against the configured Lightstreamer
1041 /// Server (if any).
1042 ///
1043 /// When `disconnect()` is called, the "Stream-Sense" mechanism is stopped.
1044 ///
1045 /// Note that active `Subscription` instances, associated with this `LightstreamerClient` instance,
1046 /// are preserved to be re-subscribed to on future Sessions.
1047 ///
1048 /// Note that the request to disconnect is accomplished by the client in a separate thread; this
1049 /// means that an invocation to `getStatus()` right after `disconnect()` might not reflect the
1050 /// change yet.
1051 ///
1052 /// When the request to disconnect is finally being executed, if the status of the client is
1053 /// "DISCONNECTED", then nothing will be done.
1054 ///
1055 /// See also `connect()`
1056 #[instrument(level = "trace")]
1057 pub async fn disconnect(&mut self) {
1058 self.make_log(Level::INFO, "Disconnecting from Lightstreamer server");
1059
1060 // If auto-reconnect is enabled and we have a connection manager, stop it
1061 if self.auto_reconnect_enabled {
1062 if let Some(ref manager) = self.connection_manager {
1063 manager.shutdown().await;
1064 }
1065 self.connection_manager = None;
1066 }
1067
1068 // Update status to disconnected
1069 self.status = ClientStatus::Disconnected(DisconnectionType::WillRetry);
1070
1071 // Notify listeners about status change
1072 for listener in &self.listeners {
1073 listener.on_status_change(&self.status.to_string());
1074 }
1075 }
1076
1077 /// Static inquiry method that can be used to share cookies between connections to the Server
1078 /// (performed by this library) and connections to other sites that are performed by the application.
1079 /// With this method, cookies received from the Server can be extracted for sending through other
1080 /// connections, according with the URI to be accessed.
1081 ///
1082 /// See `addCookies()` for clarifications on when cookies are directly stored by the library and
1083 /// when not.
1084 ///
1085 /// # Parameters
1086 ///
1087 /// * `uri`: the URI to which the cookies should be sent, or `None`.
1088 ///
1089 /// # Returns
1090 ///
1091 /// A list with the various cookies that can be sent in a HTTP request for the specified URI.
1092 /// If a `None` URI was supplied, all available non-expired cookies will be returned.
1093 pub fn get_cookies(_uri: Option<&str>) -> Cookie<'_> {
1094 // Implementation for get_cookies
1095 unimplemented!()
1096 }
1097
1098 /// Returns a list containing the `ClientListener` instances that were added to this client.
1099 ///
1100 /// # Returns
1101 ///
1102 /// A list containing the listeners that were added to this client.
1103 ///
1104 /// See also `addListener()`
1105 pub fn get_listeners(&self) -> &Vec<Box<dyn ClientListener>> {
1106 &self.listeners
1107 }
1108
1109 /// Inquiry method that gets the current client status and transport (when applicable).
1110 ///
1111 /// # Returns
1112 ///
1113 /// The current client status. It can be one of the following values:
1114 ///
1115 /// - `"CONNECTING"`: the client is waiting for a Server's response in order to establish a connection;
1116 /// - `"CONNECTED:STREAM-SENSING"`: the client has received a preliminary response from the server
1117 /// and is currently verifying if a streaming connection is possible;
1118 /// - `"CONNECTED:WS-STREAMING"`: a streaming connection over WebSocket is active;
1119 /// - `"CONNECTED:HTTP-STREAMING"`: a streaming connection over HTTP is active;
1120 /// - `"CONNECTED:WS-POLLING"`: a polling connection over WebSocket is in progress;
1121 /// - `"CONNECTED:HTTP-POLLING"`: a polling connection over HTTP is in progress;
1122 /// - `"STALLED"`: the Server has not been sending data on an active streaming connection for
1123 /// longer than a configured time;
1124 /// - `"DISCONNECTED:WILL-RETRY"`: no connection is currently active but one will be opened
1125 /// (possibly after a timeout);
1126 /// - `"DISCONNECTED:TRYING-RECOVERY"`: no connection is currently active, but one will be opened
1127 /// as soon as possible, as an attempt to recover the current session after a connection issue;
1128 /// - `"DISCONNECTED"`: no connection is currently active.
1129 ///
1130 /// See also `ClientListener.onStatusChange()`
1131 pub fn get_status(&self) -> &ClientStatus {
1132 &self.status
1133 }
1134
1135 /// Inquiry method that returns a list containing all the `Subscription` instances that are
1136 /// currently "active" on this `LightstreamerClient`.
1137 ///
1138 /// Internal second-level `Subscription` are not included.
1139 ///
1140 /// # Returns
1141 ///
1142 /// A list, containing all the `Subscription` currently "active" on this `LightstreamerClient`.
1143 /// The list can be empty.
1144 ///
1145 /// See also `subscribe()`
1146 pub fn get_subscriptions(&self) -> &Vec<Subscription> {
1147 &self.subscriptions
1148 }
1149
1150 /// Creates a new instance of `LightstreamerClient`.
1151 ///
1152 /// The constructor initializes the client with the server address and adapter set, if provided.
1153 /// It sets up the connection details and options for the client. If no server address or
1154 /// adapter set is specified, those properties on the client will be `None`. This allows
1155 /// for late configuration of these details before connecting to the Lightstreamer server.
1156 ///
1157 /// # Arguments
1158 /// * `server_address` - An optional reference to a string slice that represents the server
1159 /// address to connect to. If `None`, the server address must be set later.
1160 /// * `adapter_set` - An optional reference to a string slice that specifies the adapter set name.
1161 /// If `None`, the adapter set must be set later.
1162 ///
1163 /// # Returns
1164 /// A result containing the new `LightstreamerClient` instance if successful, or an
1165 /// `IllegalStateException` if the initialization fails due to invalid state conditions.
1166 ///
1167 /// # Panics
1168 /// Does not panic under normal circumstances. However, unexpected internal errors during
1169 /// the creation of internal components could cause panics, which should be considered when
1170 /// using this function in production code.
1171 ///
1172 pub fn new(
1173 server_address: Option<&str>,
1174 adapter_set: Option<&str>,
1175 username: Option<&str>,
1176 password: Option<&str>,
1177 ) -> Result<LightstreamerClient, LightstreamerError> {
1178 let connection_details =
1179 ConnectionDetails::new(server_address, adapter_set, username, password)?;
1180 let connection_options = ConnectionOptions::default();
1181 let (subscription_sender, subscription_receiver) = channel(100);
1182
1183 Ok(LightstreamerClient {
1184 server_address: server_address.map(|s| s.to_string()),
1185 adapter_set: adapter_set.map(|s| s.to_string()),
1186 connection_details,
1187 connection_options,
1188 listeners: Vec::new(),
1189 subscriptions: Vec::new(),
1190 status: ClientStatus::Disconnected(DisconnectionType::WillRetry),
1191 logging: LogType::StdLogs,
1192 subscription_sender,
1193 subscription_receiver,
1194 connection_manager: None,
1195 reconnection_config: ReconnectionConfig::default(),
1196 heartbeat_config: HeartbeatConfig::default(),
1197 auto_reconnect_enabled: false,
1198 })
1199 }
1200
1201 /// Removes a listener from the `LightstreamerClient` instance so that it will not receive
1202 /// events anymore.
1203 ///
1204 /// A listener can be removed at any time.
1205 ///
1206 /// # Parameters
1207 ///
1208 /// * `listener`: The listener to be removed.
1209 ///
1210 /// See also `addListener()`
1211 pub fn remove_listener(&mut self, _listener: Box<dyn ClientListener>) {
1212 unimplemented!("Implement mechanism to remove listener from LightstreamerClient");
1213 //self.listeners.remove(&listener);
1214 }
1215
1216 /// Operation method that sends a message to the Server. The message is interpreted and handled
1217 /// by the Metadata Adapter associated to the current Session. This operation supports in-order
1218 /// guaranteed message delivery with automatic batching. In other words, messages are guaranteed
1219 /// to arrive exactly once and respecting the original order, whatever is the underlying transport
1220 /// (HTTP or WebSockets). Furthermore, high frequency messages are automatically batched, if
1221 /// necessary, to reduce network round trips.
1222 ///
1223 /// Upon subsequent calls to the method, the sequential management of the involved messages is
1224 /// guaranteed. The ordering is determined by the order in which the calls to `sendMessage` are
1225 /// issued.
1226 ///
1227 /// If a message, for any reason, doesn't reach the Server (this is possible with the HTTP transport),
1228 /// it will be resent; however, this may cause the subsequent messages to be delayed. For this
1229 /// reason, each message can specify a "delayTimeout", which is the longest time the message,
1230 /// after reaching the Server, can be kept waiting if one of more preceding messages haven't
1231 /// been received yet. If the "delayTimeout" expires, these preceding messages will be discarded;
1232 /// any discarded message will be notified to the listener through `ClientMessageListener.onDiscarded()`.
1233 /// Note that, because of the parallel transport of the messages, if a zero or very low timeout
1234 /// is set for a message and the previous message was sent immediately before, it is possible
1235 /// that the latter gets discarded even if no communication issues occur. The Server may also
1236 /// enforce its own timeout on missing messages, to prevent keeping the subsequent messages for
1237 /// long time.
1238 ///
1239 /// Sequence identifiers can also be associated with the messages. In this case, the sequential
1240 /// management is restricted to all subsets of messages with the same sequence identifier associated.
1241 ///
1242 /// Notifications of the operation outcome can be received by supplying a suitable listener. The
1243 /// supplied listener is guaranteed to be eventually invoked; listeners associated with a sequence
1244 /// are guaranteed to be invoked sequentially.
1245 ///
1246 /// The "UNORDERED_MESSAGES" sequence name has a special meaning. For such a sequence, immediate
1247 /// processing is guaranteed, while strict ordering and even sequentialization of the processing
1248 /// is not enforced. Likewise, strict ordering of the notifications is not enforced. However,
1249 /// messages that, for any reason, should fail to reach the Server whereas subsequent messages
1250 /// had succeeded, might still be discarded after a server-side timeout, in order to ensure that
1251 /// the listener eventually gets a notification.
1252 ///
1253 /// Moreover, if "UNORDERED_MESSAGES" is used and no listener is supplied, a "fire and forget"
1254 /// scenario is assumed. In this case, no checks on missing, duplicated or overtaken messages
1255 /// are performed at all, so as to optimize the processing and allow the highest possible throughput.
1256 ///
1257 /// Since a message is handled by the Metadata Adapter associated to the current connection, a
1258 /// message can be sent only if a connection is currently active. If the special `enqueueWhileDisconnected`
1259 /// flag is specified it is possible to call the method at any time and the client will take
1260 /// care of sending the message as soon as a connection is available, otherwise, if the current
1261 /// status is "DISCONNECTED*", the message will be abandoned and the `ClientMessageListener.onAbort()`
1262 /// event will be fired.
1263 ///
1264 /// Note that, in any case, as soon as the status switches again to "DISCONNECTED*", any message
1265 /// still pending is aborted, including messages that were queued with the `enqueueWhileDisconnected`
1266 /// flag set to `true`.
1267 ///
1268 /// Also note that forwarding of the message to the server is made in a separate thread, hence,
1269 /// if a message is sent while the connection is active, it could be aborted because of a subsequent
1270 /// disconnection. In the same way a message sent while the connection is not active might be
1271 /// sent because of a subsequent connection.
1272 ///
1273 /// # Parameters
1274 ///
1275 /// * `message`: a text message, whose interpretation is entirely demanded to the Metadata Adapter
1276 /// associated to the current connection.
1277 /// * `sequence`: an alphanumeric identifier, used to identify a subset of messages to be managed
1278 /// in sequence; underscore characters are also allowed. If the "UNORDERED_MESSAGES" identifier
1279 /// is supplied, the message will be processed in the special way described above. The parameter
1280 /// is optional; if set to `None`, "UNORDERED_MESSAGES" is used as the sequence name.
1281 /// * `delay_timeout`: a timeout, expressed in milliseconds. If higher than the Server configured
1282 /// timeout on missing messages, the latter will be used instead. The parameter is optional; if
1283 /// a negative value is supplied, the Server configured timeout on missing messages will be applied.
1284 /// This timeout is ignored for the special "UNORDERED_MESSAGES" sequence, although a server-side
1285 /// timeout on missing messages still applies.
1286 /// * `listener`: an object suitable for receiving notifications about the processing outcome. The
1287 /// parameter is optional; if not supplied, no notification will be available.
1288 /// * `enqueue_while_disconnected`: if this flag is set to `true`, and the client is in a disconnected
1289 /// status when the provided message is handled, then the message is not aborted right away but
1290 /// is queued waiting for a new session. Note that the message can still be aborted later when
1291 /// a new session is established.
1292 pub fn send_message(
1293 &mut self,
1294 message: &str,
1295 sequence: Option<&str>,
1296 _delay_timeout: Option<u64>,
1297 listener: Option<Box<dyn ClientMessageListener>>,
1298 enqueue_while_disconnected: bool,
1299 ) {
1300 let _sequence = sequence.unwrap_or("UNORDERED_MESSAGES");
1301
1302 // Handle the message based on the current connection status
1303 match &self.status {
1304 ClientStatus::Connected(_connection_type) => {
1305 // Send the message to the server in a separate thread
1306 // ...
1307 }
1308 ClientStatus::Disconnected(_disconnection_type) => {
1309 if enqueue_while_disconnected {
1310 // Enqueue the message to be sent when a connection is available
1311 // ...
1312 } else {
1313 // Abort the message and notify the listener
1314 if let Some(listener) = listener {
1315 listener.on_abort(message, false);
1316 }
1317 }
1318 }
1319 _ => {
1320 // Enqueue the message to be sent when a connection is available
1321 // ...
1322 }
1323 }
1324 unimplemented!("Complete mechanism to send message to LightstreamerClient.");
1325 }
1326
1327 /// Static method that permits to configure the logging system used by the library. The logging
1328 /// system must respect the `LoggerProvider` interface. A custom class can be used to wrap any
1329 /// third-party logging system.
1330 ///
1331 /// If no logging system is specified, all the generated log is discarded.
1332 ///
1333 /// The following categories are available to be consumed:
1334 ///
1335 /// - `lightstreamer.stream`: logs socket activity on Lightstreamer Server connections; at INFO
1336 /// level, socket operations are logged; at DEBUG level, read/write data exchange is logged.
1337 /// - `lightstreamer.protocol`: logs requests to Lightstreamer Server and Server answers; at INFO
1338 /// level, requests are logged; at DEBUG level, request details and events from the Server are logged.
1339 /// - `lightstreamer.session`: logs Server Session lifecycle events; at INFO level, lifecycle events
1340 /// are logged; at DEBUG level, lifecycle event details are logged.
1341 /// - `lightstreamer.subscriptions`: logs subscription requests received by the clients and the related
1342 /// updates; at WARN level, alert events from the Server are logged; at INFO level, subscriptions
1343 /// and unsubscriptions are logged; at DEBUG level, requests batching and update details are logged.
1344 /// - `lightstreamer.actions`: logs settings / API calls.
1345 ///
1346 /// # Parameters
1347 ///
1348 /// * `provider`: A `LoggerProvider` instance that will be used to generate log messages by the
1349 /// library classes.
1350 pub fn set_logger_provider() {
1351 unimplemented!("Implement mechanism to set logger provider for LightstreamerClient.");
1352 }
1353 /*
1354 pub fn set_logger_provider(provider: LoggerProvider) {
1355 // Implementation for set_logger_provider
1356 }
1357 */
1358
1359 /// Provides a mean to control the way TLS certificates are evaluated, with the possibility to
1360 /// accept untrusted ones.
1361 ///
1362 /// May be called only once before creating any `LightstreamerClient` instance.
1363 ///
1364 /// # Parameters
1365 ///
1366 /// * `factory`: an instance of `ssl.SSLContext`
1367 ///
1368 /// # Raises
1369 ///
1370 /// * `IllegalArgumentException`: if the factory is `None`
1371 /// * `IllegalStateException`: if a factory is already installed
1372 pub fn set_trust_manager_factory() {
1373 unimplemented!("Implement mechanism to set trust manager factory for LightstreamerClient.");
1374 }
1375 /*
1376 pub fn set_trust_manager_factory(factory: Option<SslContext>) -> Result<(), IllegalArgumentException> {
1377 if factory.is_none() {
1378 return Err(IllegalArgumentException::new(
1379 "Factory cannot be None",
1380 ));
1381 }
1382
1383 // Implementation for set_trust_manager_factory
1384 Ok(())
1385 }
1386 */
1387
1388 /// Adds a subscription to the `LightstreamerClient` instance.
1389 ///
1390 /// Active subscriptions are subscribed to through the server as soon as possible (i.e. as soon
1391 /// as there is a session available). Active `Subscription` are automatically persisted across different
1392 /// sessions as long as a related unsubscribe call is not issued.
1393 ///
1394 /// Subscriptions can be given to the `LightstreamerClient` at any time. Once done the `Subscription`
1395 /// immediately enters the "active" state.
1396 ///
1397 /// Once "active", a `Subscription` instance cannot be provided again to a `LightstreamerClient`
1398 /// unless it is first removed from the "active" state through a call to `unsubscribe()`.
1399 ///
1400 /// Also note that forwarding of the subscription to the server is made in a separate thread.
1401 ///
1402 /// A successful subscription to the server will be notified through a `SubscriptionListener.onSubscription()`
1403 /// event.
1404 ///
1405 /// # Parameters
1406 ///
1407 /// * `subscription_sender`: A `Sender` object that sends a `SubscriptionRequest` to the `LightstreamerClient`
1408 /// * `subscription`: A `Subscription` object, carrying all the information needed to process real-time
1409 /// values.
1410 ///
1411 /// See also `unsubscribe()`
1412 ///
1413 /// # Errors
1414 ///
1415 /// Returns an error if the subscription channel is closed.
1416 pub async fn subscribe(
1417 subscription_sender: Sender<SubscriptionRequest>,
1418 subscription: Subscription,
1419 ) -> Result<(), LightstreamerError> {
1420 subscription_sender
1421 .send(SubscriptionRequest {
1422 subscription: Some(subscription),
1423 subscription_id: None,
1424 })
1425 .await
1426 .map_err(|e| LightstreamerError::channel(format!("Failed to send subscription: {}", e)))
1427 }
1428
1429 /// If you want to be able to unsubscribe from a subscription, you need to keep track of the id
1430 /// of the subscription. This blocking method allows you to wait for the id of the subscription
1431 /// to be returned.
1432 ///
1433 /// # Parameters
1434 ///
1435 /// * `subscription_sender`: A `Sender` object that sends a `SubscriptionRequest` to the `LightstreamerClient`
1436 /// * `subscription`: A `Subscription` object, carrying all the information needed to process real-time
1437 ///
1438 pub async fn subscribe_get_id(
1439 subscription_sender: Sender<SubscriptionRequest>,
1440 mut subscription: Subscription,
1441 ) -> Result<usize, LightstreamerError> {
1442 // Extract the id_receiver before sending the subscription
1443 let mut id_receiver = subscription.id_receiver;
1444
1445 // Create a new channel for the subscription we're about to send
1446 let (_new_sender, new_receiver) = channel(1);
1447 subscription.id_receiver = new_receiver;
1448
1449 // Send the subscription
1450 LightstreamerClient::subscribe(subscription_sender, subscription).await?;
1451
1452 // Wait for the ID to be updated through the channel
1453 match id_receiver.recv().await {
1454 Some(id) => Ok(id),
1455 None => Err(LightstreamerError::invalid_state(
1456 "Failed to get subscription id",
1457 )),
1458 }
1459 }
1460
1461 /// Operation method that removes a `Subscription` that is currently in the "active" state.
1462 ///
1463 /// By bringing back a `Subscription` to the "inactive" state, the unsubscription from all its
1464 /// items is requested to Lightstreamer Server.
1465 ///
1466 /// Subscription can be unsubscribed from at any time. Once done the `Subscription` immediately
1467 /// exits the "active" state.
1468 ///
1469 /// Note that forwarding of the unsubscription to the server is made in a separate thread.
1470 ///
1471 /// The unsubscription will be notified through a `SubscriptionListener.onUnsubscription()` event.
1472 ///
1473 /// # Parameters
1474 ///
1475 /// * `subscription_sender`: A `Sender` object that sends a `SubscriptionRequest` to the `LightstreamerClient`
1476 /// * `subscription_id`: The id of the subscription to be unsubscribed from.
1477 /// instance.
1478 ///
1479 /// # Errors
1480 ///
1481 /// Returns an error if the subscription channel is closed.
1482 pub async fn unsubscribe(
1483 subscription_sender: Sender<SubscriptionRequest>,
1484 subscription_id: usize,
1485 ) -> Result<(), LightstreamerError> {
1486 subscription_sender
1487 .send(SubscriptionRequest {
1488 subscription: None,
1489 subscription_id: Some(subscription_id),
1490 })
1491 .await
1492 .map_err(|e| {
1493 LightstreamerError::channel(format!("Failed to send unsubscription: {}", e))
1494 })
1495 }
1496
1497 /// Method setting enum for the logging of this instance.
1498 ///
1499 /// Default logging type is StdLogs, corresponding to `stdout`
1500 ///
1501 /// `LightstreamerClient` has methods for logging that are compatible with the `Tracing` crate.
1502 /// Enabling logging for the `Tracing` crate requires implementation of a tracing subscriber
1503 /// and its configuration and formatting.
1504 ///
1505 /// # Parameters
1506 ///
1507 /// * `logging`: An enum declaring the logging type of this `LightstreamerClient` instance.
1508 pub fn set_logging_type(&mut self, logging: LogType) {
1509 self.logging = logging;
1510 }
1511
1512 /// Method for logging messages
1513 ///
1514 /// Match case wraps log types. `loglevel` param ignored in StdLogs case, all output to stdout.
1515 ///
1516 /// # Parameters
1517 ///
1518 /// * `loglevel` Enum determining use of stdout or Tracing subscriber.
1519 pub fn make_log(&mut self, loglevel: Level, log: &str) {
1520 match self.logging {
1521 LogType::StdLogs => {
1522 debug!("{}", log);
1523 }
1524 LogType::TracingLogs => match loglevel {
1525 Level::INFO => {
1526 info!(log);
1527 }
1528 Level::WARN => {
1529 warn!(log);
1530 }
1531 Level::ERROR => {
1532 error!(log);
1533 }
1534 Level::TRACE => {
1535 trace!(log);
1536 }
1537 Level::DEBUG => {
1538 debug!(log);
1539 }
1540 },
1541 }
1542 }
1543
1544 /// Enables automatic reconnection with default configuration.
1545 pub fn enable_auto_reconnect(&mut self) {
1546 self.auto_reconnect_enabled = true;
1547 // ConnectionManager will be created during connect() to avoid circular dependency
1548 }
1549
1550 /// Enables automatic reconnection with the specified configuration.
1551 ///
1552 /// # Parameters
1553 ///
1554 /// * `reconnection_config`: Configuration for reconnection behavior
1555 /// * `heartbeat_config`: Configuration for heartbeat monitoring
1556 pub fn enable_auto_reconnect_with_config(
1557 &mut self,
1558 reconnection_config: ReconnectionConfig,
1559 heartbeat_config: HeartbeatConfig,
1560 ) -> Result<(), LightstreamerError> {
1561 self.auto_reconnect_enabled = true;
1562 self.reconnection_config = reconnection_config;
1563 self.heartbeat_config = heartbeat_config;
1564 self.auto_reconnect_enabled = true;
1565 // ConnectionManager will be created during connect() to avoid circular dependency
1566 Ok(())
1567 }
1568
1569 /// Disables automatic reconnection.
1570 pub async fn disable_auto_reconnect(&mut self) {
1571 self.auto_reconnect_enabled = false;
1572 if let Some(manager) = &self.connection_manager {
1573 manager.shutdown().await;
1574 }
1575 self.connection_manager = None;
1576 }
1577
1578 /// Returns whether automatic reconnection is currently enabled.
1579 pub fn is_auto_reconnect_enabled(&self) -> bool {
1580 self.auto_reconnect_enabled
1581 }
1582
1583 /// Gets the current reconnection configuration.
1584 pub fn get_reconnection_config(&self) -> &ReconnectionConfig {
1585 &self.reconnection_config
1586 }
1587
1588 /// Gets the current heartbeat configuration.
1589 pub fn get_heartbeat_config(&self) -> &HeartbeatConfig {
1590 &self.heartbeat_config
1591 }
1592
1593 /// Updates the reconnection configuration.
1594 pub fn set_reconnection_config(&mut self, config: ReconnectionConfig) {
1595 self.reconnection_config = config;
1596 // Note: ConnectionManager doesn't support runtime config updates
1597 // The configuration is set during ConnectionManager creation
1598 }
1599
1600 /// Updates the heartbeat configuration.
1601 pub fn set_heartbeat_config(&mut self, config: HeartbeatConfig) {
1602 self.heartbeat_config = config;
1603 // Note: ConnectionManager doesn't support runtime config updates
1604 // The configuration is set during ConnectionManager creation
1605 }
1606
1607 /// Gets the current connection state from the connection manager.
1608 pub async fn get_connection_state(&self) -> ConnectionState {
1609 if let Some(manager) = &self.connection_manager {
1610 manager.get_connection_state().await
1611 } else {
1612 ConnectionState::Disconnected
1613 }
1614 }
1615
1616 /// Gets connection metrics from the connection manager.
1617 pub async fn get_connection_metrics(&self) -> crate::connection::management::ConnectionMetrics {
1618 if let Some(manager) = &self.connection_manager {
1619 manager.get_metrics().await
1620 } else {
1621 crate::connection::management::ConnectionMetrics::default()
1622 }
1623 }
1624
1625 /// Forces an immediate reconnection attempt if auto-reconnect is enabled.
1626 pub async fn force_reconnect(&mut self) -> Result<(), LightstreamerError> {
1627 if !self.auto_reconnect_enabled {
1628 return Err(LightstreamerError::invalid_state(
1629 "Auto-reconnect is not enabled",
1630 ));
1631 }
1632
1633 if let Some(manager) = &mut self.connection_manager {
1634 manager.force_reconnect().await?;
1635 }
1636
1637 Ok(())
1638 }
1639}
1640
1641/// Computes the effective liveness window after the server declares its
1642/// keepalive interval in the `conok` message.
1643///
1644/// Returns the server-declared value when no window was configured
1645/// (`current_ms == 0`), otherwise the larger of the two — the window may only
1646/// widen, never shrink, so a server declaring a longer keepalive cannot cause
1647/// false-positive disconnects.
1648#[must_use]
1649#[inline]
1650fn widen_keepalive_timeout(current_ms: u64, server_declared_ms: u64) -> u64 {
1651 if current_ms == 0 {
1652 server_declared_ms
1653 } else {
1654 current_ms.max(server_declared_ms)
1655 }
1656}
1657
1658#[cfg(test)]
1659mod tests {
1660 use super::*;
1661 use crate::subscription::{Subscription, SubscriptionListener, SubscriptionMode};
1662
1663 use std::fmt::Debug;
1664 use std::sync::{Arc, Mutex};
1665 use tokio::sync::Notify;
1666
1667 #[derive(Debug)]
1668 struct MockClientListener {
1669 property_changes: Arc<Mutex<Vec<String>>>,
1670 status_changes: Arc<Mutex<Vec<String>>>,
1671 server_errors: Arc<Mutex<Vec<(i32, String)>>>,
1672 }
1673
1674 impl MockClientListener {
1675 fn new() -> Self {
1676 MockClientListener {
1677 property_changes: Arc::new(Mutex::new(Vec::new())),
1678 status_changes: Arc::new(Mutex::new(Vec::new())),
1679 server_errors: Arc::new(Mutex::new(Vec::new())),
1680 }
1681 }
1682
1683 #[allow(dead_code)]
1684 fn with_shared_data(
1685 property_changes: Arc<Mutex<Vec<String>>>,
1686 status_changes: Arc<Mutex<Vec<String>>>,
1687 server_errors: Arc<Mutex<Vec<(i32, String)>>>,
1688 ) -> Self {
1689 MockClientListener {
1690 property_changes,
1691 status_changes,
1692 server_errors,
1693 }
1694 }
1695 }
1696
1697 impl ClientListener for MockClientListener {
1698 fn on_property_change(&self, property: &str) {
1699 if let Ok(mut guard) = self.property_changes.lock() {
1700 guard.push(property.to_string());
1701 }
1702 }
1703
1704 fn on_status_change(&self, status: &str) {
1705 if let Ok(mut guard) = self.status_changes.lock() {
1706 guard.push(status.to_string());
1707 }
1708 }
1709
1710 fn on_server_error(&self, code: i32, message: &str) {
1711 if let Ok(mut guard) = self.server_errors.lock() {
1712 guard.push((code, message.to_string()));
1713 }
1714 }
1715 }
1716
1717 #[allow(dead_code)]
1718 struct MockSubscriptionListener;
1719
1720 impl SubscriptionListener for MockSubscriptionListener {
1721 fn on_subscription(&mut self) {}
1722 fn on_unsubscription(&mut self) {}
1723 fn on_item_update(&self, _update: &ItemUpdate) {}
1724 }
1725
1726 impl Debug for MockSubscriptionListener {
1727 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1728 write!(f, "MockSubscriptionListener")
1729 }
1730 }
1731
1732 #[allow(dead_code)]
1733 struct LightstreamerClientTestContext {
1734 client: LightstreamerClient,
1735 property_changes: Arc<Mutex<Vec<String>>>,
1736 status_changes: Arc<Mutex<Vec<String>>>,
1737 server_errors: Arc<Mutex<Vec<(i32, String)>>>,
1738 }
1739
1740 impl LightstreamerClientTestContext {
1741 #[allow(dead_code)]
1742 fn new() -> Result<Self, LightstreamerError> {
1743 let property_changes = Arc::new(Mutex::new(Vec::new()));
1744 let status_changes = Arc::new(Mutex::new(Vec::new()));
1745 let server_errors = Arc::new(Mutex::new(Vec::new()));
1746 let listener = MockClientListener::with_shared_data(
1747 Arc::clone(&property_changes),
1748 Arc::clone(&status_changes),
1749 Arc::clone(&server_errors),
1750 );
1751
1752 let mut client = LightstreamerClient::new(
1753 Some("http://test.lightstreamer.com"),
1754 Some("DEMO"),
1755 None,
1756 None,
1757 )?;
1758 client.add_listener(Box::new(listener));
1759
1760 Ok(LightstreamerClientTestContext {
1761 client,
1762 property_changes,
1763 status_changes,
1764 server_errors,
1765 })
1766 }
1767 }
1768
1769 #[test]
1770 fn test_new_lightstreamer_client() -> Result<(), LightstreamerError> {
1771 let client = LightstreamerClient::new(
1772 Some("http://test.lightstreamer.com"),
1773 Some("DEMO"),
1774 None,
1775 None,
1776 )?;
1777 assert_eq!(
1778 client.server_address,
1779 Some("http://test.lightstreamer.com".to_string())
1780 );
1781 assert_eq!(client.adapter_set, Some("DEMO".to_string()));
1782 let client = LightstreamerClient::new(
1783 Some("http://test.lightstreamer.com"),
1784 Some("DEMO"),
1785 Some("user1"),
1786 Some("pass1"),
1787 )?;
1788 assert_eq!(
1789 client.connection_details.get_user(),
1790 Some(&"user1".to_string())
1791 );
1792 assert_eq!(
1793 client.connection_details.get_password(),
1794 Some(&"pass1".to_string())
1795 );
1796 let result = LightstreamerClient::new(Some("invalid-url"), Some("DEMO"), None, None);
1797 assert!(result.is_err());
1798 let client = LightstreamerClient::new(None, Some("DEMO"), None, None)?;
1799 assert_eq!(client.server_address, None);
1800 Ok(())
1801 }
1802
1803 #[test]
1804 fn test_add_listener() -> Result<(), LightstreamerError> {
1805 let mut client = LightstreamerClient::new(
1806 Some("http://test.lightstreamer.com"),
1807 Some("DEMO"),
1808 None,
1809 None,
1810 )?;
1811 assert_eq!(client.listeners.len(), 0);
1812 let listener = Box::new(MockClientListener::new());
1813 client.add_listener(listener);
1814 assert_eq!(client.listeners.len(), 1);
1815 let listener2 = Box::new(MockClientListener::new());
1816 client.add_listener(listener2);
1817 assert_eq!(client.listeners.len(), 2);
1818 Ok(())
1819 }
1820
1821 #[test]
1822 fn test_get_listeners() -> Result<(), LightstreamerError> {
1823 let mut client = LightstreamerClient::new(
1824 Some("http://test.lightstreamer.com"),
1825 Some("DEMO"),
1826 None,
1827 None,
1828 )?;
1829 assert_eq!(client.get_listeners().len(), 0);
1830
1831 let listener = Box::new(MockClientListener::new());
1832 client.add_listener(listener);
1833 assert_eq!(client.get_listeners().len(), 1);
1834 Ok(())
1835 }
1836
1837 #[test]
1838 fn test_get_status() -> Result<(), LightstreamerError> {
1839 let client = LightstreamerClient::new(
1840 Some("http://test.lightstreamer.com"),
1841 Some("DEMO"),
1842 None,
1843 None,
1844 )?;
1845 match client.get_status() {
1846 ClientStatus::Disconnected(DisconnectionType::WillRetry) => {}
1847 _ => panic!("Expected initial status to be DISCONNECTED:WILL-RETRY"),
1848 }
1849 Ok(())
1850 }
1851
1852 #[test]
1853 fn test_get_subscriptions() -> Result<(), LightstreamerError> {
1854 let client = LightstreamerClient::new(
1855 Some("http://test.lightstreamer.com"),
1856 Some("DEMO"),
1857 None,
1858 None,
1859 )?;
1860 assert_eq!(client.get_subscriptions().len(), 0);
1861 Ok(())
1862 }
1863
1864 #[tokio::test]
1865 async fn test_connect_with_no_server_address() -> Result<(), LightstreamerError> {
1866 let client = LightstreamerClient::new(None, Some("DEMO"), None, None)?;
1867 let client_arc = Arc::new(tokio::sync::Mutex::new(client));
1868 let shutdown_signal = Arc::new(Notify::new());
1869 let result = LightstreamerClient::connect(client_arc, shutdown_signal).await;
1870 assert!(result.is_err());
1871 Ok(())
1872 }
1873
1874 #[tokio::test]
1875 async fn test_forced_transport_validation() -> Result<(), LightstreamerError> {
1876 let mut client = LightstreamerClient::new(
1877 Some("http://test.lightstreamer.com"),
1878 Some("DEMO"),
1879 None,
1880 None,
1881 )?;
1882
1883 client.connection_options.set_forced_transport(None);
1884 let client_arc = Arc::new(tokio::sync::Mutex::new(client));
1885 let shutdown_signal = Arc::new(Notify::new());
1886 let result = LightstreamerClient::connect(client_arc.clone(), shutdown_signal).await;
1887 assert!(result.is_err());
1888 client_arc
1889 .lock()
1890 .await
1891 .connection_options
1892 .set_forced_transport(Some(Transport::WsStreaming));
1893 Ok(())
1894 }
1895
1896 #[test]
1897 fn test_subscription_params_generation() -> Result<(), LightstreamerError> {
1898 let subscription = Subscription::new(
1899 SubscriptionMode::Merge,
1900 Some(vec!["item1".to_string(), "item2".to_string()]),
1901 Some(vec!["field1".to_string(), "field2".to_string()]),
1902 )?;
1903
1904 let params_str = LightstreamerClient::get_subscription_params(&subscription, 1)?;
1905
1906 assert!(params_str.contains("LS_reqId=1"));
1907 assert!(params_str.contains("LS_op=add"));
1908 assert!(params_str.contains("LS_subId="));
1909 assert!(params_str.contains("LS_mode=MERGE"));
1910 assert!(params_str.contains("LS_group="));
1911 assert!(params_str.contains("LS_schema="));
1912 Ok(())
1913 }
1914
1915 #[test]
1916 fn test_unsubscription_params_generation() -> Result<(), LightstreamerError> {
1917 let params_str = LightstreamerClient::get_unsubscription_params(42, 123)?;
1918
1919 assert!(params_str.contains("LS_reqId=123"));
1920 assert!(params_str.contains("LS_op=delete"));
1921 assert!(params_str.contains("LS_subId=42"));
1922 Ok(())
1923 }
1924
1925 #[test]
1926 fn test_logging_functions() -> Result<(), LightstreamerError> {
1927 let mut client = LightstreamerClient::new(
1928 Some("http://test.lightstreamer.com"),
1929 Some("DEMO"),
1930 None,
1931 None,
1932 )?;
1933
1934 client.set_logging_type(LogType::StdLogs);
1935
1936 client.make_log(Level::INFO, "Test log message");
1937 client.make_log(Level::DEBUG, "Test debug message");
1938 client.set_logging_type(LogType::TracingLogs);
1939 client.make_log(Level::INFO, "Test tracing log message");
1940 client.make_log(Level::DEBUG, "Test tracing debug message");
1941 Ok(())
1942 }
1943
1944 #[test]
1945 fn test_debug_implementation() -> Result<(), LightstreamerError> {
1946 let client = LightstreamerClient::new(
1947 Some("http://test.lightstreamer.com"),
1948 Some("DEMO"),
1949 None,
1950 None,
1951 )?;
1952
1953 // Test that Debug implementation works without panicking
1954 let debug_string = format!("{:?}", client);
1955
1956 // Verify it contains expected fields
1957 assert!(debug_string.contains("server_address"));
1958 assert!(debug_string.contains("adapter_set"));
1959 assert!(debug_string.contains("connection_details"));
1960 assert!(debug_string.contains("connection_options"));
1961 assert!(debug_string.contains("listeners"));
1962 assert!(debug_string.contains("subscriptions"));
1963
1964 // Verify the values are included
1965 assert!(debug_string.contains("http://test.lightstreamer.com"));
1966 assert!(debug_string.contains("DEMO"));
1967 Ok(())
1968 }
1969
1970 #[test]
1971 #[should_panic(expected = "Implement mechanism to add cookies to LightstreamerClient")]
1972 fn test_add_cookies() {
1973 // Test the static method add_cookies
1974 let cookie = Cookie::new("test_cookie", "test_value");
1975 LightstreamerClient::add_cookies("http://test.lightstreamer.com", &cookie);
1976 }
1977
1978 #[test]
1979 #[should_panic(expected = "not implemented")]
1980 fn test_get_cookies() {
1981 // Test the static method get_cookies
1982 LightstreamerClient::get_cookies(Some("http://test.lightstreamer.com"));
1983 }
1984}
1985
1986#[cfg(test)]
1987mod liveness_tests {
1988 use super::widen_keepalive_timeout;
1989
1990 #[test]
1991 fn test_widen_keepalive_timeout_unconfigured_adopts_server_value() {
1992 assert_eq!(widen_keepalive_timeout(0, 5_000), 5_000);
1993 }
1994
1995 #[test]
1996 fn test_widen_keepalive_timeout_server_longer_widens() {
1997 assert_eq!(widen_keepalive_timeout(15_000, 30_000), 30_000);
1998 }
1999
2000 #[test]
2001 fn test_widen_keepalive_timeout_server_shorter_never_shrinks() {
2002 assert_eq!(widen_keepalive_timeout(15_000, 5_000), 15_000);
2003 }
2004
2005 #[test]
2006 fn test_widen_keepalive_timeout_equal_values_unchanged() {
2007 assert_eq!(widen_keepalive_timeout(15_000, 15_000), 15_000);
2008 }
2009}