lightstreamer_rs/client/implementation.rs
1use crate::subscription::{ItemUpdate, Snapshot, Subscription, SubscriptionMode};
2
3use crate::client::Transport;
4pub(crate) use crate::client::listener::ClientListener;
5use crate::client::message_listener::ClientMessageListener;
6use crate::client::model::{ClientStatus, DisconnectionType, LogType};
7use crate::client::request::SubscriptionRequest;
8use crate::client::utils::get_subscription_by_id;
9use crate::connection::{ConnectionDetails, ConnectionOptions};
10use crate::connection::{ConnectionManager, ConnectionState, HeartbeatConfig, ReconnectionConfig};
11use crate::utils::{LightstreamerError, clean_message, parse_arguments};
12use cookie::Cookie;
13use futures_util::{SinkExt, StreamExt};
14use std::collections::HashMap;
15use std::fmt::{self, Debug, Formatter};
16use std::sync::Arc;
17use std::time::Duration;
18use tokio::sync::mpsc::channel;
19use tokio::sync::{
20 Mutex, Notify,
21 mpsc::{Receiver, Sender},
22};
23use tokio::time::Instant as TokioInstant;
24use tokio_tungstenite::{
25 connect_async,
26 tungstenite::{
27 Message,
28 http::{HeaderName, HeaderValue, Request},
29 },
30};
31use tracing::{Level, debug, error, info, instrument, trace, warn};
32use url::Url;
33
34/// Facade class for the management of the communication to Lightstreamer Server. Used to provide
35/// configuration settings, event handlers, operations for the control of the connection lifecycle,
36/// Subscription handling and to send messages.
37///
38/// An instance of `LightstreamerClient` handles the communication with Lightstreamer Server on a
39/// specified endpoint. Hence, it hosts one "Session"; or, more precisely, a sequence of Sessions,
40/// since any Session may fail and be recovered, or it can be interrupted on purpose. So, normally,
41/// a single instance of `LightstreamerClient` is needed.
42///
43/// However, multiple instances of `LightstreamerClient` can be used, toward the same or multiple
44/// endpoints.
45///
46/// You can listen to the events generated by a session by registering an event listener, such as
47/// `ClientListener` or `SubscriptionListener`. These listeners allow you to handle various events,
48/// such as session creation, connection status, subscription updates, and server messages. However,
49/// you should be aware that the event notifications are dispatched by a single thread, the so-called
50/// event thread. This means that if the operations of a listener are slow or blocking, they will
51/// delay the processing of the other listeners and affect the performance of your application.
52/// Therefore, you should delegate any slow or blocking operations to a dedicated thread, and keep
53/// the listener methods as fast and simple as possible. Note that even if you create multiple
54/// instances of `LightstreamerClient`, they will all use a single event thread, that is shared
55/// among them.
56///
57/// # Parameters
58///
59/// * `server_address`: the address of the Lightstreamer Server to which this `LightstreamerClient`
60/// will connect to. It is possible to specify it later by using `None` here. See
61/// `ConnectionDetails.setServerAddress()` for details.
62/// * `adapter_set`: the name of the Adapter Set mounted on Lightstreamer Server to be used to handle
63/// all requests in the Session associated with this `LightstreamerClient`. It is possible not to
64/// specify it at all or to specify it later by using `None` here. See `ConnectionDetails.setAdapterSet()`
65/// for details.
66///
67/// # Raises
68///
69/// * `IllegalArgumentException`: if a not valid address is passed. See `ConnectionDetails.setServerAddress()`
70/// for details.
71pub struct LightstreamerClient {
72 /// The address of the Lightstreamer Server to which this `LightstreamerClient` will connect.
73 server_address: Option<String>,
74 /// The name of the Adapter Set mounted on Lightstreamer Server to be used to handle all
75 /// requests in the Session associated with this `LightstreamerClient`.
76 adapter_set: Option<String>,
77 /// Data object that contains the details needed to open a connection to a Lightstreamer Server.
78 /// This instance is set up by the `LightstreamerClient` object at its own creation. Properties
79 /// of this object can be overwritten by values received from a Lightstreamer Server.
80 pub connection_details: ConnectionDetails,
81 /// Data object that contains options and policies for the connection to the server. This instance
82 /// is set up by the `LightstreamerClient` object at its own creation. Properties of this object
83 /// can be overwritten by values received from a Lightstreamer Server.
84 pub connection_options: ConnectionOptions,
85 /// A list of listeners that will receive events from the `LightstreamerClient` instance.
86 listeners: Vec<Box<dyn ClientListener>>,
87 /// A list containing all the `Subscription` instances that are currently "active" on this
88 /// `LightstreamerClient`.
89 subscriptions: Vec<Subscription>,
90 /// The current status of the client.
91 status: ClientStatus,
92 /// Logging Type to be used
93 logging: LogType,
94 /// The sender that can be used to subscribe/unsubscribe
95 pub subscription_sender: Sender<SubscriptionRequest>,
96 /// The receiver used for subscribe/unsubsribe
97 subscription_receiver: Receiver<SubscriptionRequest>,
98 /// Connection manager for automatic reconnection
99 connection_manager: Option<Arc<ConnectionManager>>,
100 /// Configuration for reconnection behavior
101 reconnection_config: ReconnectionConfig,
102 /// Configuration for heartbeat monitoring
103 heartbeat_config: HeartbeatConfig,
104 /// Whether automatic reconnection is enabled
105 auto_reconnect_enabled: bool,
106}
107
108impl Debug for LightstreamerClient {
109 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
110 f.debug_struct("LightstreamerClient")
111 .field("server_address", &self.server_address)
112 .field("adapter_set", &self.adapter_set)
113 .field("connection_details", &self.connection_details)
114 .field("connection_options", &self.connection_options)
115 .field("listeners", &self.listeners)
116 .field("subscriptions", &self.subscriptions)
117 .finish()
118 }
119}
120
121impl LightstreamerClient {
122 /// A constant string representing the name of the library.
123 pub const LIB_NAME: &'static str = "rust_client";
124
125 /// A constant string representing the version of the library.
126 pub const LIB_VERSION: &'static str = "0.1.0";
127
128 //
129 // Constants for WebSocket connection.
130 //
131 /// WebSocket key used in the WebSocket handshake process.
132 /// This is a base64-encoded value that is used to establish the WebSocket connection.
133 pub const SEC_WEBSOCKET_KEY: &'static str = "PNDUibe9ex7PnsrLbt0N4w==";
134
135 /// WebSocket protocol identifier for the TLCP protocol used by Lightstreamer.
136 /// This identifies the specific subprotocol used over the WebSocket connection.
137 pub const SEC_WEBSOCKET_PROTOCOL: &'static str = "TLCP-2.4.0.lightstreamer.com";
138
139 /// WebSocket version used for the connection.
140 /// Version 13 is the standard version used in modern WebSocket implementations.
141 pub const SEC_WEBSOCKET_VERSION: &'static str = "13";
142
143 /// WebSocket upgrade header value.
144 /// Used to indicate that the client wishes to upgrade from HTTP to WebSocket protocol.
145 pub const SEC_WEBSOCKET_UPGRADE: &'static str = "websocket";
146
147 /// A constant string representing the version of the TLCP protocol used by the library.
148 pub const TLCP_VERSION: &'static str = "TLCP-2.4.0";
149
150 /// Grace period (milliseconds) added on top of the keepalive interval before
151 /// the connection is considered dead by the liveness watchdog.
152 ///
153 /// The server guarantees a message (data or `probe`) at least every keepalive
154 /// interval; the grace absorbs network jitter and scheduling delays.
155 pub const KEEPALIVE_GRACE_MS: u64 = 5_000;
156
157 /// Static method that can be used to share cookies between connections to the Server (performed by
158 /// this library) and connections to other sites that are performed by the application. With this
159 /// method, cookies received by the application can be added (or replaced if already present) to
160 /// the cookie set used by the library to access the Server. Obviously, only cookies whose domain
161 /// is compatible with the Server domain will be used internally.
162 ///
163 /// This method should be invoked before calling the `LightstreamerClient.connect()` method.
164 /// However it can be invoked at any time; it will affect the internal cookie set immediately
165 /// and the sending of cookies on the next HTTP request or WebSocket establishment.
166 ///
167 /// # Parameters
168 ///
169 /// * `uri`: the URI from which the supplied cookies were received. It cannot be `None`.
170 /// * `cookies`: an instance of `http.cookies.SimpleCookie`.
171 ///
172 /// See also `getCookies()`
173 pub fn add_cookies(_uri: &str, _cookies: &Cookie) {
174 // Implementation for add_cookies
175 unimplemented!("Implement mechanism to add cookies to LightstreamerClient");
176 }
177
178 /// Adds a listener that will receive events from the `LightstreamerClient` instance.
179 ///
180 /// The same listener can be added to several different `LightstreamerClient` instances.
181 ///
182 /// A listener can be added at any time. A call to add a listener already present will be ignored.
183 ///
184 /// # Parameters
185 ///
186 /// * `listener`: An object that will receive the events as documented in the `ClientListener`
187 /// interface.
188 ///
189 /// See also `removeListener()`
190 pub fn add_listener(&mut self, listener: Box<dyn ClientListener>) {
191 self.listeners.push(listener);
192 }
193
194 /// Packs s string with the necessary parameters for a subscription request.
195 ///
196 /// # Parameters
197 ///
198 /// * `subscription`: The subscription for which to get the parameters.
199 /// * `request_id`: The request ID to use in the parameters.
200 ///
201 fn get_subscription_params(
202 subscription: &Subscription,
203 request_id: usize,
204 ) -> Result<String, LightstreamerError> {
205 let ls_req_id = request_id.to_string();
206 let ls_sub_id = subscription.id.to_string();
207 let ls_mode = subscription.get_mode().to_string();
208 let ls_group = match subscription.get_item_group() {
209 Some(item_group) => item_group.to_string(),
210 None => match subscription.get_items() {
211 Some(items) => items.join(" "),
212 None => {
213 return Err(LightstreamerError::invalid_argument(
214 "No item group or items found in subscription.",
215 ));
216 }
217 },
218 };
219 let ls_schema = match subscription.get_field_schema() {
220 Some(field_schema) => field_schema.to_string(),
221 None => match subscription.get_fields() {
222 Some(fields) => fields.join(" "),
223 None => {
224 return Err(LightstreamerError::invalid_argument(
225 "No field schema or fields found in subscription.",
226 ));
227 }
228 },
229 };
230 let ls_data_adapter = match subscription.get_data_adapter() {
231 Some(data_adapter) => data_adapter.to_string(),
232 None => "".to_string(),
233 };
234 let ls_snapshot = subscription
235 .get_requested_snapshot()
236 .unwrap_or_default()
237 .to_string();
238 //
239 // Prepare the subscription request.
240 //
241 let mut params: Vec<(&str, &str)> = vec![
242 ("LS_data_adapter", &ls_data_adapter),
243 ("LS_reqId", &ls_req_id),
244 ("LS_op", "add"),
245 ("LS_subId", &ls_sub_id),
246 ("LS_mode", &ls_mode),
247 ("LS_group", &ls_group),
248 ("LS_schema", &ls_schema),
249 ("LS_ack", "false"),
250 ];
251 // Remove the data adapter parameter if not specified.
252 if ls_data_adapter.is_empty() {
253 params.remove(0);
254 }
255 if !ls_snapshot.is_empty() {
256 params.push(("LS_snapshot", &ls_snapshot));
257 }
258
259 Ok(serde_urlencoded::to_string(¶ms)?)
260 }
261
262 fn get_unsubscription_params(
263 subscription_id: usize,
264 request_id: usize,
265 ) -> Result<String, LightstreamerError> {
266 let ls_req_id = request_id.to_string();
267 let ls_sub_id = subscription_id.to_string();
268 //
269 // Prepare the unsubscription request.
270 //
271 let params: Vec<(&str, &str)> = vec![
272 ("LS_reqId", &ls_req_id),
273 ("LS_op", "delete"),
274 ("LS_subId", &ls_sub_id),
275 ];
276
277 Ok(serde_urlencoded::to_string(¶ms)?)
278 }
279
280 /// Operation method that requests to open a Session against the configured Lightstreamer Server.
281 ///
282 /// When `connect()` is called, unless a single transport was forced through `ConnectionOptions.setForcedTransport()`,
283 /// the so called "Stream-Sense" mechanism is started: if the client does not receive any answer
284 /// for some seconds from the streaming connection, then it will automatically open a polling
285 /// connection.
286 ///
287 /// A polling connection may also be opened if the environment is not suitable for a streaming
288 /// connection.
289 ///
290 /// Note that as "polling connection" we mean a loop of polling requests, each of which requires
291 /// opening a synchronous (i.e. not streaming) connection to Lightstreamer Server.
292 ///
293 /// Note that the request to connect is accomplished by the client in a separate thread; this
294 /// means that an invocation to `getStatus()` right after `connect()` might not reflect the change
295 /// yet.
296 ///
297 /// When the request to connect is finally being executed, if the current status of the client
298 /// is not `DISCONNECTED`, then nothing will be done.
299 ///
300 /// # Raises
301 ///
302 /// * `IllegalStateException`: if no server address was configured.
303 ///
304 /// See also `getStatus()`
305 ///
306 /// See also `disconnect()`
307 ///
308 /// See also `ClientListener.onStatusChange()`
309 ///
310 /// See also `ConnectionDetails.setServerAddress()`
311 #[instrument(level = "trace")]
312 pub async fn connect(
313 client: Arc<Mutex<Self>>,
314 shutdown_signal: Arc<Notify>,
315 ) -> Result<(), LightstreamerError> {
316 // If auto-reconnect is enabled, use ConnectionManager
317 let auto_reconnect_enabled = {
318 let client_guard = client.lock().await;
319 client_guard.auto_reconnect_enabled
320 };
321
322 if auto_reconnect_enabled {
323 let (reconnection_config, heartbeat_config) = {
324 let client_guard = client.lock().await;
325 (
326 client_guard.reconnection_config.clone(),
327 client_guard.heartbeat_config.clone(),
328 )
329 };
330
331 let weak_client = Arc::downgrade(&client);
332 let connection_manager: Arc<ConnectionManager> =
333 ConnectionManager::new(weak_client, reconnection_config, heartbeat_config);
334
335 {
336 let mut client_guard = client.lock().await;
337 client_guard.connection_manager = Some(connection_manager.clone());
338 }
339
340 // Use direct connection method
341 return client.lock().await.connect_direct(shutdown_signal).await;
342 }
343
344 // Fallback to direct connection without auto-reconnect
345 let mut client_guard = client.lock().await;
346 client_guard.connect_direct(shutdown_signal).await
347 }
348
349 /// Direct connection method without automatic reconnection
350 #[instrument(level = "trace")]
351 pub async fn connect_direct(
352 &mut self,
353 shutdown_signal: Arc<Notify>,
354 ) -> Result<(), LightstreamerError> {
355 // Check if the server address is configured.
356 if self.server_address.is_none() {
357 return Err(LightstreamerError::invalid_state(
358 "No server address was configured.",
359 ));
360 }
361 //
362 // Only WebSocket streaming transport is currently supported.
363 //
364 let forced_transport = self.connection_options.get_forced_transport();
365 if forced_transport.is_none_or(|t| *t != Transport::WsStreaming) {
366 return Err(LightstreamerError::invalid_state(
367 "Only WebSocket streaming transport is currently supported.",
368 ));
369 }
370 //
371 // Convert the HTTP URL to a WebSocket URL.
372 //
373 let http_url = self
374 .connection_details
375 .get_server_address()
376 .ok_or_else(|| LightstreamerError::invalid_state("Server address is not set."))?;
377 let mut url = Url::parse(http_url).map_err(|e| {
378 LightstreamerError::invalid_state(format!("Failed to parse server address URL: {}", e))
379 })?;
380 match url.scheme() {
381 "http" => url.set_scheme("ws").map_err(|()| {
382 LightstreamerError::invalid_state("Failed to set scheme to ws for WebSocket URL.")
383 })?,
384 "https" => url.set_scheme("wss").map_err(|()| {
385 LightstreamerError::invalid_state("Failed to set scheme to wss for WebSocket URL.")
386 })?,
387 invalid_scheme => {
388 return Err(LightstreamerError::invalid_state(format!(
389 "Unsupported scheme '{}' found when converting HTTP URL to WebSocket URL.",
390 invalid_scheme
391 )));
392 }
393 }
394 let ws_url = url.as_str();
395
396 // Build the WebSocket request with the necessary headers.
397 let request = Request::builder()
398 .uri(ws_url)
399 .header(
400 HeaderName::from_static("connection"),
401 HeaderValue::from_static("Upgrade"),
402 )
403 .header(
404 HeaderName::from_static("host"),
405 HeaderValue::from_str(url.host_str().unwrap_or("localhost")).map_err(|err| {
406 LightstreamerError::invalid_state(format!(
407 "Invalid header value for header with name 'host': {}",
408 err
409 ))
410 })?,
411 )
412 .header(
413 HeaderName::from_static("sec-websocket-key"),
414 HeaderValue::from_static(Self::SEC_WEBSOCKET_KEY),
415 )
416 .header(
417 HeaderName::from_static("sec-websocket-protocol"),
418 HeaderValue::from_static(Self::SEC_WEBSOCKET_PROTOCOL),
419 )
420 .header(
421 HeaderName::from_static("sec-websocket-version"),
422 HeaderValue::from_static(Self::SEC_WEBSOCKET_VERSION),
423 )
424 .header(
425 HeaderName::from_static("upgrade"),
426 HeaderValue::from_static(Self::SEC_WEBSOCKET_UPGRADE),
427 )
428 .body(())?;
429
430 // Connect to the Lightstreamer server using WebSocket.
431 let ws_stream = match connect_async(request).await {
432 Ok((ws_stream, response)) => {
433 if let Some(server_header) = response.headers().get("server") {
434 self.make_log(
435 Level::INFO,
436 &format!(
437 "Connected to Lightstreamer server: {}",
438 server_header.to_str().unwrap_or("")
439 ),
440 );
441 } else {
442 self.make_log(Level::INFO, "Connected to Lightstreamer server");
443 }
444 ws_stream
445 }
446 Err(err) => {
447 return Err(LightstreamerError::connection(format!(
448 "Failed to connect to Lightstreamer server with WebSocket: {}",
449 err
450 )));
451 }
452 };
453
454 // Split the WebSocket stream into a write and a read stream.
455 let (mut write_stream, mut read_stream) = ws_stream.split();
456
457 //
458 // Initiate communication with the server by sending a 'wsok' message.
459 //
460 write_stream.send(Message::Text("wsok".into())).await?;
461
462 //
463 // Start reading and processing messages from the server.
464 //
465 let mut is_connected = false;
466 let mut request_id: usize = 0;
467 let mut _session_id: Option<String> = None;
468 let mut subscription_id: usize = 0;
469 let mut subscription_item_updates: HashMap<usize, HashMap<usize, ItemUpdate>> =
470 HashMap::new();
471
472 //
473 // Liveness enforcement and reverse heartbeat.
474 //
475 // The server guarantees that some message (data or `probe`) is sent at
476 // least every keepalive interval. If nothing arrives within the
477 // effective keepalive window plus a grace period, the connection is
478 // considered dead (half-open TCP) and an error is returned so the
479 // caller can reconnect. The window starts from the configured
480 // keepalive interval (if any) and is widened to the server-declared
481 // keepalive from the `conok` message once known.
482 //
483 let configured_keepalive_ms = self.connection_options.get_keepalive_interval();
484 let reverse_heartbeat_ms = self.connection_options.get_reverse_heartbeat_interval();
485 let mut keepalive_timeout_ms: u64 = configured_keepalive_ms;
486 let mut last_message_at = TokioInstant::now();
487
488 let mut heartbeat_timer = if reverse_heartbeat_ms > 0 {
489 let mut timer = tokio::time::interval(Duration::from_millis(reverse_heartbeat_ms));
490 timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
491 Some(timer)
492 } else {
493 None
494 };
495
496 loop {
497 let liveness_deadline = last_message_at
498 + Duration::from_millis(
499 keepalive_timeout_ms.saturating_add(Self::KEEPALIVE_GRACE_MS),
500 );
501 tokio::select! {
502 message = read_stream.next() => {
503 last_message_at = TokioInstant::now();
504 match message {
505 Some(Ok(Message::Text(text))) => {
506 // Messages could include multiple submessages separated by /r/n.
507 // Split the message into submessages and process each one separately.
508 let submessages: Vec<&str> = text.split("\r\n")
509 .filter(|&line| !line.trim().is_empty()) // Filter out empty lines.
510 .collect();
511 for submessage in submessages {
512 let clean_text = clean_message(submessage);
513 let submessage_fields: Vec<&str> = clean_text.split(",").collect();
514 match *submessage_fields.first().unwrap_or(&"") {
515 //
516 // Errors from server.
517 //
518 //
519 // Session-level error: the session is not usable — fail the
520 // connection so the caller can reconnect. (The previous
521 // `break` only exited the submessage loop, leaving a dead
522 // session looping forever.)
523 //
524 "conerr" => {
525 self.make_log( Level::ERROR, &format!("Received connection error from Lightstreamer server: {}", clean_text) );
526 return Err(LightstreamerError::connection(format!(
527 "connection error from server: {}",
528 clean_text
529 )));
530 },
531 //
532 // Request-level error: the session survives — log and continue.
533 //
534 "reqerr" => {
535 self.make_log( Level::ERROR, &format!("Received request error from Lightstreamer server: {}", clean_text) );
536 },
537 //
538 // Session created successfully.
539 //
540 "conok" => {
541 is_connected = true;
542 // CONOK,<session-ID>,<request-limit>,<keep-alive>,<control-link>
543 // Widen the liveness window to the server-declared keepalive
544 // so enforcement works even when no keepalive was configured.
545 if let Some(server_keepalive_ms) = submessage_fields
546 .get(3)
547 .and_then(|v| v.trim().parse::<u64>().ok())
548 .filter(|v| *v > 0)
549 {
550 keepalive_timeout_ms = widen_keepalive_timeout(
551 keepalive_timeout_ms,
552 server_keepalive_ms,
553 );
554 self.make_log( Level::DEBUG, &format!("Server-declared keepalive interval: {} ms", server_keepalive_ms) );
555 }
556 if let Some(session_id) = submessage_fields.get(1) {
557 self.make_log( Level::DEBUG, &format!("Session creation confirmed by server: {}", clean_text) );
558 self.make_log( Level::DEBUG, &format!("Session created with ID: {:?}", session_id) );
559 //
560 // Subscribe to the desired items.
561 //
562 while let Some(subscription) = self.subscriptions.get_mut(subscription_id) {
563 //
564 // Gather all the necessary subscription parameters.
565 //
566 subscription_id += 1;
567 request_id += 1;
568 subscription.id = subscription_id;
569 subscription.id_sender.try_send(subscription_id)?;
570
571 let encoded_params = match Self::get_subscription_params(subscription, request_id)
572 {
573 Ok(params) => params,
574 Err(err) => {
575 return Err(err);
576 },
577 };
578
579 write_stream
580 .send(Message::Text(format!("control\r\n{}", encoded_params).into()))
581 .await?;
582 debug!("Sent subscription request: '{}'", encoded_params);
583 }
584 } else {
585 return Err(LightstreamerError::protocol(
586 "Session ID not found in 'conok' message from server",
587 ));
588 }
589 },
590 //
591 // Notifications from server.
592 //
593 "conf" | "cons" | "clientip" | "servname" | "prog" | "sync" | "eos" => {
594 self.make_log( Level::INFO, &format!("Received notification from server: {}", clean_text) );
595 // Don't do anything with these notifications for now.
596 },
597 "probe" => {
598 self.make_log( Level::DEBUG, &format!("Received probe message from server: {}", clean_text ) );
599 },
600 "reqok" => {
601 self.make_log( Level::DEBUG, &format!("Received reqok message from server: '{}'", clean_text ) );
602 },
603 //
604 // Subscription confirmation from server.
605 //
606 "subok" => {
607 self.make_log( Level::INFO, &format!("Subscription confirmed by server: '{}'", clean_text) );
608 },
609 //
610 // Usubscription confirmation from server.
611 //
612 "unsub" => {
613 self.make_log( Level::INFO, &format!("Unsubscription confirmed by server: '{}'", clean_text) );
614 },
615 //
616 // Data updates from server.
617 //
618 "u" => {
619 // Parse arguments from the received message.
620 let arguments = parse_arguments(&clean_text);
621 //
622 // Extract the subscription from the first argument.
623 //
624 let subscription_index = arguments.get(1).unwrap_or(&"").parse::<usize>().unwrap_or(0);
625 let subscription = match get_subscription_by_id(self.get_subscriptions(), subscription_index) {
626 Some(subscription) => subscription,
627 None => {
628 self.make_log( Level::WARN, &format!("Subscription not found for index: {}", subscription_index) );
629 continue;
630
631 }
632 };
633 //
634 // Extract the item from the second argument.
635 //
636 let item_index = arguments.get(2).unwrap_or(&"").parse::<usize>().unwrap_or(0);
637 let item = match subscription.get_items() {
638 Some(items) => items.get(item_index-1),
639 None => {
640 self.make_log( Level::WARN, &format!("No items found in subscription: {:?}", subscription) );
641 continue;
642 }
643 };
644 //
645 // Determine if the update is a snapshot or real-time update based on the subscription parameters.
646 //
647 let is_snapshot = match subscription.get_requested_snapshot() {
648 Some(ls_snapshot) => {
649 match ls_snapshot {
650 Snapshot::No => false,
651 Snapshot::Yes => {
652 match subscription.get_mode() {
653 SubscriptionMode::Merge => {
654 if arguments.len() == 4 && arguments[3] == "$" {
655 // EOS notification received
656 true
657 } else {
658 // If item doesn't exist in item_updates yet, the first update
659 // is always a snapshot.
660 if let Some(item_updates) = subscription_item_updates.get(&(subscription_index)) {
661 if item_updates.get(&(item_index)).is_some() {
662 // Item update already exists in item_updates, so it's not a snapshot.
663 false
664 } else {
665 // Item update doesn't exist in item_updates, so the first update is always a snapshot.
666 true
667 }
668 } else {
669 // Item updates not found for subscription, so the first update is always a snapshot.
670 true
671 }
672 }
673 },
674 SubscriptionMode::Distinct | SubscriptionMode::Command => {
675 !subscription.is_subscribed()
676 },
677 _ => false,
678 }
679 },
680 _ => false,
681 }
682 },
683 None => false,
684 };
685
686 // Extract the field values from the third argument.
687 let field_values: Vec<&str> = arguments.get(3).unwrap_or(&"").split('|').collect();
688
689 //
690 // Get fields from subscription and create a HashMap of field names and values.
691 //
692 let subscription_fields = subscription.get_fields();
693 let mut field_map: HashMap<String, Option<String>> = subscription_fields
694 .map(|fields| fields.iter().map(|field_name| (field_name.to_string(), None)).collect())
695 .unwrap_or_default();
696
697 let mut field_index = 0;
698 for value in field_values {
699 match value {
700 "" => {
701 // An empty value means the field is unchanged compared to the previous update of the same field.
702 if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
703 field_map.insert(field_name.to_string(), None);
704 }
705 field_index += 1;
706 }
707 "#" | "$" => {
708 // A value corresponding to a hash sign "#" or dollar sign "$" means the field is null or empty.
709 if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
710 field_map.insert(field_name.to_string(), Some("".to_string()));
711 }
712 field_index += 1;
713 }
714 value if value.starts_with('^') => {
715 let command = value.chars().nth(1).unwrap_or(' ');
716 match command {
717 '0'..='9' => {
718 let count = value[1..].parse().unwrap_or(0);
719 for i in 0..count {
720 if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index + i)) {
721 field_map.insert(field_name.to_string(), None);
722 }
723 }
724 field_index += count;
725 }
726 'P' | 'T' => {
727 let diff_value = serde_urlencoded::from_str(&value[2..]).unwrap_or_else(|_| value[2..].to_string());
728 if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index))
729 && let Some(prev_value) = field_map.get(field_name).and_then(|v| v.as_ref()) {
730 let new_value = match command {
731 'P' => {
732 // Apply JSON Patch
733 let patch: serde_json::Value = serde_json::from_str(&diff_value).unwrap_or(serde_json::Value::Null);
734 let mut prev_json: serde_json::Value = serde_json::from_str(prev_value).unwrap_or(serde_json::Value::Null);
735 let patch_operations: Vec<json_patch::PatchOperation> = serde_json::from_value(patch).unwrap_or_default();
736 if let Err(e) = json_patch::patch(&mut prev_json, &patch_operations) {
737 tracing::warn!("Failed to apply JSON patch: {}", e);
738 }
739 prev_json.to_string()
740 }
741 'T' => {
742 // Apply TLCP-diff
743 //tlcp_diff::apply_diff(prev_value, &diff_value).unwrap_or_else(|_| prev_value.to_string())
744 unimplemented!("Implement TLCP-diff");
745 }
746 _ => unreachable!(),
747 };
748 field_map.insert(field_name.to_string(), Some(new_value.to_string()));
749 }
750 field_index += 1;
751 }
752 _ => {
753 let decoded_value = serde_urlencoded::from_str(value).unwrap_or_else(|_| value.to_string());
754 if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
755 field_map.insert(field_name.to_string(), Some(decoded_value));
756 }
757 field_index += 1;
758 }
759 }
760 }
761 value if value.starts_with('{') => {
762 // in this case it is a json payload that we will let the consumer handle. In this case, it is important
763 // to preserve casing for parsing.
764 let original_json = parse_arguments(submessage).get(3).unwrap_or(&"").split('|').collect::<Vec<&str>>();
765 let mut payload = "";
766 for json in original_json.iter()
767 {
768 if json.is_empty() || *json == "#"
769 {
770 continue;
771 }
772
773 payload = json;
774 }
775
776 if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
777 field_map.insert(field_name.to_string(), Some(payload.to_string()));
778 }
779 field_index += 1;
780 }
781 _ => {
782 let decoded_value = serde_urlencoded::from_str(value).unwrap_or_else(|_| value.to_string());
783 if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
784 field_map.insert(field_name.to_string(), Some(decoded_value));
785 }
786 field_index += 1;
787 }
788 }
789 }
790
791 // Store only item_update's changed fields.
792 let changed_fields: HashMap<String, String> = field_map.iter()
793 .filter_map(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone())))
794 .collect();
795
796 //
797 // Take the proper item_update from item_updates and update it with changed fields.
798 // If the item_update doesn't exist yet, create a new one.
799 //
800 let current_item_update: ItemUpdate;
801 match subscription_item_updates.get_mut(&(subscription_index)) {
802 Some(item_updates) => match item_updates.get_mut(&(item_index)) {
803 Some(item_update) => {
804 //
805 // Iterate changed_fields and update existing item_update.fields assigning the new values.
806 //
807 for (field_name, new_value) in &changed_fields {
808 if item_update.fields.contains_key(field_name) {
809 item_update.fields.insert((*field_name).clone(), Some(new_value.clone()));
810 }
811 }
812 item_update.changed_fields = changed_fields.clone();
813 item_update.is_snapshot = is_snapshot;
814 current_item_update = item_update.clone();
815 },
816 None => {
817 // Create a new item_update and add it to item_updates.
818 let item_update = ItemUpdate {
819 item_name: item.cloned(),
820 item_pos: item_index,
821 fields: field_map.clone(),
822 changed_fields: changed_fields.clone(),
823 is_snapshot,
824 };
825 current_item_update = item_update.clone();
826 item_updates.insert(item_index, item_update);
827 }
828 },
829 None => {
830 // Create a new item_update and add it to item_updates.
831 let item_update = ItemUpdate {
832 item_name: item.cloned(),
833 item_pos: item_index,
834 fields: field_map,
835 changed_fields,
836 is_snapshot,
837 };
838 current_item_update = item_update.clone();
839 let mut item_updates = HashMap::new();
840 item_updates.insert(item_index, item_update);
841 subscription_item_updates.insert(subscription_index, item_updates);
842 }
843 };
844
845 // Get mutable subscription listeners directly.
846 let subscription_listeners = subscription.get_listeners();
847
848 // Iterate subscription listeners and call on_item_update for each listener.
849 for listener in subscription_listeners {
850 listener.on_item_update(¤t_item_update);
851 }
852 }
853 //
854 // Connection confirmation from server.
855 //
856 "wsok" => {
857 self.make_log( Level::INFO, &format!("Connection confirmed by server: '{}'", clean_text) );
858 //
859 // Request session creation.
860 //
861 let ls_adapter_set = match self.connection_details.get_adapter_set() {
862 Some(adapter_set) => adapter_set,
863 None => {
864 return Err(LightstreamerError::invalid_state(
865 "No adapter set found in connection details.",
866 ));
867 },
868 };
869 let ls_send_sync = self.connection_options.get_send_sync().to_string();
870 let ls_keepalive_millis = configured_keepalive_ms.to_string();
871 let ls_inactivity_millis = reverse_heartbeat_ms.to_string();
872 let mut params: Vec<(&str, &str)> = vec![
873 ("LS_adapter_set", ls_adapter_set),
874 ("LS_cid", "mgQkwtwdysogQz2BJ4Ji kOj2Bg"),
875 ("LS_send_sync", &ls_send_sync),
876 ];
877 if configured_keepalive_ms > 0 {
878 params.push(("LS_keepalive_millis", &ls_keepalive_millis));
879 }
880 if reverse_heartbeat_ms > 0 {
881 params.push(("LS_inactivity_millis", &ls_inactivity_millis));
882 }
883 if let Some(user) = &self.connection_details.get_user() {
884 params.push(("LS_user", user));
885 }
886 if let Some(password) = &self.connection_details.get_password() {
887 params.push(("LS_password", password));
888 }
889 params.push(("LS_protocol", Self::TLCP_VERSION));
890 let encoded_params = serde_urlencoded::to_string(¶ms)?;
891 write_stream
892 .send(Message::Text(format!("create_session\r\n{}\n", encoded_params).into()))
893 .await?;
894 self.make_log( Level::DEBUG, &format!("Sent create session request: '{}'", encoded_params) );
895 },
896 unexpected_message => {
897 return Err(LightstreamerError::protocol(format!(
898 "Unexpected message received from server: '{:?}'",
899 unexpected_message
900 )));
901 },
902 }
903 }
904 },
905 Some(Ok(non_text_message)) => {
906 return Err(LightstreamerError::protocol(format!(
907 "Unexpected non-text message from server: {:?}",
908 non_text_message
909 )));
910 },
911 Some(Err(err)) => {
912 return Err(LightstreamerError::protocol(format!(
913 "Error reading message from server: {}",
914 err
915 )));
916 },
917 None => {
918 self.make_log( Level::DEBUG, "No more messages from server" );
919 break;
920 },
921 }
922 },
923 Some(subscription_request) = self.subscription_receiver.recv() => {
924 request_id += 1;
925 // Process subscription requests.
926 if let Some(subscription) = subscription_request.subscription
927 {
928 self.subscriptions.push(subscription);
929
930 // if we are not connected yet, we will subscribe later
931 if !is_connected {
932 continue;
933 }
934
935 subscription_id += 1;
936 // SAFETY: We just pushed a subscription, so last_mut() must return Some.
937 // Using debug_assert to catch invariant violations during development.
938 let sub = self.subscriptions.last_mut();
939 debug_assert!(sub.is_some(), "subscriptions.last_mut() returned None after push()");
940 if let Some(sub) = sub {
941 sub.id = subscription_id;
942 sub.id_sender.try_send(subscription_id)?;
943 }
944
945 // SAFETY: We just pushed a subscription, so last() must return Some.
946 // Extract encoded_params in a separate scope to avoid borrow across await.
947 let encoded_params = {
948 let last_sub = self.subscriptions.last();
949 debug_assert!(last_sub.is_some(), "subscriptions.last() returned None after push()");
950 let Some(last_sub) = last_sub else {
951 return Err(LightstreamerError::invalid_state(
952 "subscriptions.last() returned None immediately after push()"
953 ));
954 };
955 Self::get_subscription_params(last_sub, request_id)?
956 };
957
958 write_stream
959 .send(Message::Text(format!("control\r\n{}", encoded_params).into()))
960 .await?;
961
962 self.make_log( Level::INFO, &format!("Sent subscription request: '{}'", encoded_params) );
963 }
964 // Process unsubscription requests.
965 else if let Some(unsubscription_id) = subscription_request.subscription_id
966 {
967 let encoded_params = match Self::get_unsubscription_params(unsubscription_id, request_id)
968 {
969 Ok(params) => params,
970 Err(err) => {
971 return Err(err);
972 },
973 };
974
975 write_stream
976 .send(Message::Text(format!("control\r\n{}", encoded_params).into()))
977 .await?;
978
979 self.make_log( Level::INFO, &format!("Sent unsubscription request: '{}'", encoded_params) );
980
981 self.subscriptions.retain(|s| s.id != unsubscription_id);
982
983 if self.subscriptions.is_empty()
984 {
985 self.make_log( Level::INFO, "No more subscriptions, disconnecting" );
986 shutdown_signal.notify_one();
987 }
988 }
989 },
990 _ = shutdown_signal.notified() => {
991 self.make_log( Level::INFO, "Received shutdown signal" );
992 break;
993 },
994 //
995 // Liveness watchdog: nothing received within the keepalive
996 // window plus grace — the connection is half-open / dead.
997 //
998 _ = tokio::time::sleep_until(liveness_deadline), if keepalive_timeout_ms > 0 => {
999 let idle_ms = keepalive_timeout_ms.saturating_add(Self::KEEPALIVE_GRACE_MS);
1000 self.make_log(
1001 Level::ERROR,
1002 &format!(
1003 "No message received from server within {} ms (keepalive + grace); connection considered dead",
1004 idle_ms
1005 ),
1006 );
1007 return Err(LightstreamerError::connection(format!(
1008 "no message received from server within {} ms (keepalive + grace)",
1009 idle_ms
1010 )));
1011 },
1012 //
1013 // Reverse heartbeat: prove client liveness to the server and
1014 // surface write errors promptly on a dead connection.
1015 //
1016 _ = async {
1017 match heartbeat_timer.as_mut() {
1018 Some(timer) => { timer.tick().await; },
1019 None => std::future::pending::<()>().await,
1020 }
1021 }, if is_connected => {
1022 write_stream.send(Message::Text("heartbeat\r\n".into())).await?;
1023 self.make_log(Level::DEBUG, "Sent reverse heartbeat to server");
1024 },
1025 }
1026 }
1027
1028 Ok(())
1029 }
1030
1031 /// Operation method that requests to close the Session opened against the configured Lightstreamer
1032 /// Server (if any).
1033 ///
1034 /// When `disconnect()` is called, the "Stream-Sense" mechanism is stopped.
1035 ///
1036 /// Note that active `Subscription` instances, associated with this `LightstreamerClient` instance,
1037 /// are preserved to be re-subscribed to on future Sessions.
1038 ///
1039 /// Note that the request to disconnect is accomplished by the client in a separate thread; this
1040 /// means that an invocation to `getStatus()` right after `disconnect()` might not reflect the
1041 /// change yet.
1042 ///
1043 /// When the request to disconnect is finally being executed, if the status of the client is
1044 /// "DISCONNECTED", then nothing will be done.
1045 ///
1046 /// See also `connect()`
1047 #[instrument(level = "trace")]
1048 pub async fn disconnect(&mut self) {
1049 self.make_log(Level::INFO, "Disconnecting from Lightstreamer server");
1050
1051 // If auto-reconnect is enabled and we have a connection manager, stop it
1052 if self.auto_reconnect_enabled {
1053 if let Some(ref manager) = self.connection_manager {
1054 manager.shutdown().await;
1055 }
1056 self.connection_manager = None;
1057 }
1058
1059 // Update status to disconnected
1060 self.status = ClientStatus::Disconnected(DisconnectionType::WillRetry);
1061
1062 // Notify listeners about status change
1063 for listener in &self.listeners {
1064 listener.on_status_change(&self.status.to_string());
1065 }
1066 }
1067
1068 /// Static inquiry method that can be used to share cookies between connections to the Server
1069 /// (performed by this library) and connections to other sites that are performed by the application.
1070 /// With this method, cookies received from the Server can be extracted for sending through other
1071 /// connections, according with the URI to be accessed.
1072 ///
1073 /// See `addCookies()` for clarifications on when cookies are directly stored by the library and
1074 /// when not.
1075 ///
1076 /// # Parameters
1077 ///
1078 /// * `uri`: the URI to which the cookies should be sent, or `None`.
1079 ///
1080 /// # Returns
1081 ///
1082 /// A list with the various cookies that can be sent in a HTTP request for the specified URI.
1083 /// If a `None` URI was supplied, all available non-expired cookies will be returned.
1084 pub fn get_cookies(_uri: Option<&str>) -> Cookie<'_> {
1085 // Implementation for get_cookies
1086 unimplemented!()
1087 }
1088
1089 /// Returns a list containing the `ClientListener` instances that were added to this client.
1090 ///
1091 /// # Returns
1092 ///
1093 /// A list containing the listeners that were added to this client.
1094 ///
1095 /// See also `addListener()`
1096 pub fn get_listeners(&self) -> &Vec<Box<dyn ClientListener>> {
1097 &self.listeners
1098 }
1099
1100 /// Inquiry method that gets the current client status and transport (when applicable).
1101 ///
1102 /// # Returns
1103 ///
1104 /// The current client status. It can be one of the following values:
1105 ///
1106 /// - `"CONNECTING"`: the client is waiting for a Server's response in order to establish a connection;
1107 /// - `"CONNECTED:STREAM-SENSING"`: the client has received a preliminary response from the server
1108 /// and is currently verifying if a streaming connection is possible;
1109 /// - `"CONNECTED:WS-STREAMING"`: a streaming connection over WebSocket is active;
1110 /// - `"CONNECTED:HTTP-STREAMING"`: a streaming connection over HTTP is active;
1111 /// - `"CONNECTED:WS-POLLING"`: a polling connection over WebSocket is in progress;
1112 /// - `"CONNECTED:HTTP-POLLING"`: a polling connection over HTTP is in progress;
1113 /// - `"STALLED"`: the Server has not been sending data on an active streaming connection for
1114 /// longer than a configured time;
1115 /// - `"DISCONNECTED:WILL-RETRY"`: no connection is currently active but one will be opened
1116 /// (possibly after a timeout);
1117 /// - `"DISCONNECTED:TRYING-RECOVERY"`: no connection is currently active, but one will be opened
1118 /// as soon as possible, as an attempt to recover the current session after a connection issue;
1119 /// - `"DISCONNECTED"`: no connection is currently active.
1120 ///
1121 /// See also `ClientListener.onStatusChange()`
1122 pub fn get_status(&self) -> &ClientStatus {
1123 &self.status
1124 }
1125
1126 /// Inquiry method that returns a list containing all the `Subscription` instances that are
1127 /// currently "active" on this `LightstreamerClient`.
1128 ///
1129 /// Internal second-level `Subscription` are not included.
1130 ///
1131 /// # Returns
1132 ///
1133 /// A list, containing all the `Subscription` currently "active" on this `LightstreamerClient`.
1134 /// The list can be empty.
1135 ///
1136 /// See also `subscribe()`
1137 pub fn get_subscriptions(&self) -> &Vec<Subscription> {
1138 &self.subscriptions
1139 }
1140
1141 /// Creates a new instance of `LightstreamerClient`.
1142 ///
1143 /// The constructor initializes the client with the server address and adapter set, if provided.
1144 /// It sets up the connection details and options for the client. If no server address or
1145 /// adapter set is specified, those properties on the client will be `None`. This allows
1146 /// for late configuration of these details before connecting to the Lightstreamer server.
1147 ///
1148 /// # Arguments
1149 /// * `server_address` - An optional reference to a string slice that represents the server
1150 /// address to connect to. If `None`, the server address must be set later.
1151 /// * `adapter_set` - An optional reference to a string slice that specifies the adapter set name.
1152 /// If `None`, the adapter set must be set later.
1153 ///
1154 /// # Returns
1155 /// A result containing the new `LightstreamerClient` instance if successful, or an
1156 /// `IllegalStateException` if the initialization fails due to invalid state conditions.
1157 ///
1158 /// # Panics
1159 /// Does not panic under normal circumstances. However, unexpected internal errors during
1160 /// the creation of internal components could cause panics, which should be considered when
1161 /// using this function in production code.
1162 ///
1163 pub fn new(
1164 server_address: Option<&str>,
1165 adapter_set: Option<&str>,
1166 username: Option<&str>,
1167 password: Option<&str>,
1168 ) -> Result<LightstreamerClient, LightstreamerError> {
1169 let connection_details =
1170 ConnectionDetails::new(server_address, adapter_set, username, password)?;
1171 let connection_options = ConnectionOptions::default();
1172 let (subscription_sender, subscription_receiver) = channel(100);
1173
1174 Ok(LightstreamerClient {
1175 server_address: server_address.map(|s| s.to_string()),
1176 adapter_set: adapter_set.map(|s| s.to_string()),
1177 connection_details,
1178 connection_options,
1179 listeners: Vec::new(),
1180 subscriptions: Vec::new(),
1181 status: ClientStatus::Disconnected(DisconnectionType::WillRetry),
1182 logging: LogType::StdLogs,
1183 subscription_sender,
1184 subscription_receiver,
1185 connection_manager: None,
1186 reconnection_config: ReconnectionConfig::default(),
1187 heartbeat_config: HeartbeatConfig::default(),
1188 auto_reconnect_enabled: false,
1189 })
1190 }
1191
1192 /// Removes a listener from the `LightstreamerClient` instance so that it will not receive
1193 /// events anymore.
1194 ///
1195 /// A listener can be removed at any time.
1196 ///
1197 /// # Parameters
1198 ///
1199 /// * `listener`: The listener to be removed.
1200 ///
1201 /// See also `addListener()`
1202 pub fn remove_listener(&mut self, _listener: Box<dyn ClientListener>) {
1203 unimplemented!("Implement mechanism to remove listener from LightstreamerClient");
1204 //self.listeners.remove(&listener);
1205 }
1206
1207 /// Operation method that sends a message to the Server. The message is interpreted and handled
1208 /// by the Metadata Adapter associated to the current Session. This operation supports in-order
1209 /// guaranteed message delivery with automatic batching. In other words, messages are guaranteed
1210 /// to arrive exactly once and respecting the original order, whatever is the underlying transport
1211 /// (HTTP or WebSockets). Furthermore, high frequency messages are automatically batched, if
1212 /// necessary, to reduce network round trips.
1213 ///
1214 /// Upon subsequent calls to the method, the sequential management of the involved messages is
1215 /// guaranteed. The ordering is determined by the order in which the calls to `sendMessage` are
1216 /// issued.
1217 ///
1218 /// If a message, for any reason, doesn't reach the Server (this is possible with the HTTP transport),
1219 /// it will be resent; however, this may cause the subsequent messages to be delayed. For this
1220 /// reason, each message can specify a "delayTimeout", which is the longest time the message,
1221 /// after reaching the Server, can be kept waiting if one of more preceding messages haven't
1222 /// been received yet. If the "delayTimeout" expires, these preceding messages will be discarded;
1223 /// any discarded message will be notified to the listener through `ClientMessageListener.onDiscarded()`.
1224 /// Note that, because of the parallel transport of the messages, if a zero or very low timeout
1225 /// is set for a message and the previous message was sent immediately before, it is possible
1226 /// that the latter gets discarded even if no communication issues occur. The Server may also
1227 /// enforce its own timeout on missing messages, to prevent keeping the subsequent messages for
1228 /// long time.
1229 ///
1230 /// Sequence identifiers can also be associated with the messages. In this case, the sequential
1231 /// management is restricted to all subsets of messages with the same sequence identifier associated.
1232 ///
1233 /// Notifications of the operation outcome can be received by supplying a suitable listener. The
1234 /// supplied listener is guaranteed to be eventually invoked; listeners associated with a sequence
1235 /// are guaranteed to be invoked sequentially.
1236 ///
1237 /// The "UNORDERED_MESSAGES" sequence name has a special meaning. For such a sequence, immediate
1238 /// processing is guaranteed, while strict ordering and even sequentialization of the processing
1239 /// is not enforced. Likewise, strict ordering of the notifications is not enforced. However,
1240 /// messages that, for any reason, should fail to reach the Server whereas subsequent messages
1241 /// had succeeded, might still be discarded after a server-side timeout, in order to ensure that
1242 /// the listener eventually gets a notification.
1243 ///
1244 /// Moreover, if "UNORDERED_MESSAGES" is used and no listener is supplied, a "fire and forget"
1245 /// scenario is assumed. In this case, no checks on missing, duplicated or overtaken messages
1246 /// are performed at all, so as to optimize the processing and allow the highest possible throughput.
1247 ///
1248 /// Since a message is handled by the Metadata Adapter associated to the current connection, a
1249 /// message can be sent only if a connection is currently active. If the special `enqueueWhileDisconnected`
1250 /// flag is specified it is possible to call the method at any time and the client will take
1251 /// care of sending the message as soon as a connection is available, otherwise, if the current
1252 /// status is "DISCONNECTED*", the message will be abandoned and the `ClientMessageListener.onAbort()`
1253 /// event will be fired.
1254 ///
1255 /// Note that, in any case, as soon as the status switches again to "DISCONNECTED*", any message
1256 /// still pending is aborted, including messages that were queued with the `enqueueWhileDisconnected`
1257 /// flag set to `true`.
1258 ///
1259 /// Also note that forwarding of the message to the server is made in a separate thread, hence,
1260 /// if a message is sent while the connection is active, it could be aborted because of a subsequent
1261 /// disconnection. In the same way a message sent while the connection is not active might be
1262 /// sent because of a subsequent connection.
1263 ///
1264 /// # Parameters
1265 ///
1266 /// * `message`: a text message, whose interpretation is entirely demanded to the Metadata Adapter
1267 /// associated to the current connection.
1268 /// * `sequence`: an alphanumeric identifier, used to identify a subset of messages to be managed
1269 /// in sequence; underscore characters are also allowed. If the "UNORDERED_MESSAGES" identifier
1270 /// is supplied, the message will be processed in the special way described above. The parameter
1271 /// is optional; if set to `None`, "UNORDERED_MESSAGES" is used as the sequence name.
1272 /// * `delay_timeout`: a timeout, expressed in milliseconds. If higher than the Server configured
1273 /// timeout on missing messages, the latter will be used instead. The parameter is optional; if
1274 /// a negative value is supplied, the Server configured timeout on missing messages will be applied.
1275 /// This timeout is ignored for the special "UNORDERED_MESSAGES" sequence, although a server-side
1276 /// timeout on missing messages still applies.
1277 /// * `listener`: an object suitable for receiving notifications about the processing outcome. The
1278 /// parameter is optional; if not supplied, no notification will be available.
1279 /// * `enqueue_while_disconnected`: if this flag is set to `true`, and the client is in a disconnected
1280 /// status when the provided message is handled, then the message is not aborted right away but
1281 /// is queued waiting for a new session. Note that the message can still be aborted later when
1282 /// a new session is established.
1283 pub fn send_message(
1284 &mut self,
1285 message: &str,
1286 sequence: Option<&str>,
1287 _delay_timeout: Option<u64>,
1288 listener: Option<Box<dyn ClientMessageListener>>,
1289 enqueue_while_disconnected: bool,
1290 ) {
1291 let _sequence = sequence.unwrap_or("UNORDERED_MESSAGES");
1292
1293 // Handle the message based on the current connection status
1294 match &self.status {
1295 ClientStatus::Connected(_connection_type) => {
1296 // Send the message to the server in a separate thread
1297 // ...
1298 }
1299 ClientStatus::Disconnected(_disconnection_type) => {
1300 if enqueue_while_disconnected {
1301 // Enqueue the message to be sent when a connection is available
1302 // ...
1303 } else {
1304 // Abort the message and notify the listener
1305 if let Some(listener) = listener {
1306 listener.on_abort(message, false);
1307 }
1308 }
1309 }
1310 _ => {
1311 // Enqueue the message to be sent when a connection is available
1312 // ...
1313 }
1314 }
1315 unimplemented!("Complete mechanism to send message to LightstreamerClient.");
1316 }
1317
1318 /// Static method that permits to configure the logging system used by the library. The logging
1319 /// system must respect the `LoggerProvider` interface. A custom class can be used to wrap any
1320 /// third-party logging system.
1321 ///
1322 /// If no logging system is specified, all the generated log is discarded.
1323 ///
1324 /// The following categories are available to be consumed:
1325 ///
1326 /// - `lightstreamer.stream`: logs socket activity on Lightstreamer Server connections; at INFO
1327 /// level, socket operations are logged; at DEBUG level, read/write data exchange is logged.
1328 /// - `lightstreamer.protocol`: logs requests to Lightstreamer Server and Server answers; at INFO
1329 /// level, requests are logged; at DEBUG level, request details and events from the Server are logged.
1330 /// - `lightstreamer.session`: logs Server Session lifecycle events; at INFO level, lifecycle events
1331 /// are logged; at DEBUG level, lifecycle event details are logged.
1332 /// - `lightstreamer.subscriptions`: logs subscription requests received by the clients and the related
1333 /// updates; at WARN level, alert events from the Server are logged; at INFO level, subscriptions
1334 /// and unsubscriptions are logged; at DEBUG level, requests batching and update details are logged.
1335 /// - `lightstreamer.actions`: logs settings / API calls.
1336 ///
1337 /// # Parameters
1338 ///
1339 /// * `provider`: A `LoggerProvider` instance that will be used to generate log messages by the
1340 /// library classes.
1341 pub fn set_logger_provider() {
1342 unimplemented!("Implement mechanism to set logger provider for LightstreamerClient.");
1343 }
1344 /*
1345 pub fn set_logger_provider(provider: LoggerProvider) {
1346 // Implementation for set_logger_provider
1347 }
1348 */
1349
1350 /// Provides a mean to control the way TLS certificates are evaluated, with the possibility to
1351 /// accept untrusted ones.
1352 ///
1353 /// May be called only once before creating any `LightstreamerClient` instance.
1354 ///
1355 /// # Parameters
1356 ///
1357 /// * `factory`: an instance of `ssl.SSLContext`
1358 ///
1359 /// # Raises
1360 ///
1361 /// * `IllegalArgumentException`: if the factory is `None`
1362 /// * `IllegalStateException`: if a factory is already installed
1363 pub fn set_trust_manager_factory() {
1364 unimplemented!("Implement mechanism to set trust manager factory for LightstreamerClient.");
1365 }
1366 /*
1367 pub fn set_trust_manager_factory(factory: Option<SslContext>) -> Result<(), IllegalArgumentException> {
1368 if factory.is_none() {
1369 return Err(IllegalArgumentException::new(
1370 "Factory cannot be None",
1371 ));
1372 }
1373
1374 // Implementation for set_trust_manager_factory
1375 Ok(())
1376 }
1377 */
1378
1379 /// Adds a subscription to the `LightstreamerClient` instance.
1380 ///
1381 /// Active subscriptions are subscribed to through the server as soon as possible (i.e. as soon
1382 /// as there is a session available). Active `Subscription` are automatically persisted across different
1383 /// sessions as long as a related unsubscribe call is not issued.
1384 ///
1385 /// Subscriptions can be given to the `LightstreamerClient` at any time. Once done the `Subscription`
1386 /// immediately enters the "active" state.
1387 ///
1388 /// Once "active", a `Subscription` instance cannot be provided again to a `LightstreamerClient`
1389 /// unless it is first removed from the "active" state through a call to `unsubscribe()`.
1390 ///
1391 /// Also note that forwarding of the subscription to the server is made in a separate thread.
1392 ///
1393 /// A successful subscription to the server will be notified through a `SubscriptionListener.onSubscription()`
1394 /// event.
1395 ///
1396 /// # Parameters
1397 ///
1398 /// * `subscription_sender`: A `Sender` object that sends a `SubscriptionRequest` to the `LightstreamerClient`
1399 /// * `subscription`: A `Subscription` object, carrying all the information needed to process real-time
1400 /// values.
1401 ///
1402 /// See also `unsubscribe()`
1403 ///
1404 /// # Errors
1405 ///
1406 /// Returns an error if the subscription channel is closed.
1407 pub async fn subscribe(
1408 subscription_sender: Sender<SubscriptionRequest>,
1409 subscription: Subscription,
1410 ) -> Result<(), LightstreamerError> {
1411 subscription_sender
1412 .send(SubscriptionRequest {
1413 subscription: Some(subscription),
1414 subscription_id: None,
1415 })
1416 .await
1417 .map_err(|e| LightstreamerError::channel(format!("Failed to send subscription: {}", e)))
1418 }
1419
1420 /// If you want to be able to unsubscribe from a subscription, you need to keep track of the id
1421 /// of the subscription. This blocking method allows you to wait for the id of the subscription
1422 /// to be returned.
1423 ///
1424 /// # Parameters
1425 ///
1426 /// * `subscription_sender`: A `Sender` object that sends a `SubscriptionRequest` to the `LightstreamerClient`
1427 /// * `subscription`: A `Subscription` object, carrying all the information needed to process real-time
1428 ///
1429 pub async fn subscribe_get_id(
1430 subscription_sender: Sender<SubscriptionRequest>,
1431 mut subscription: Subscription,
1432 ) -> Result<usize, LightstreamerError> {
1433 // Extract the id_receiver before sending the subscription
1434 let mut id_receiver = subscription.id_receiver;
1435
1436 // Create a new channel for the subscription we're about to send
1437 let (_new_sender, new_receiver) = channel(1);
1438 subscription.id_receiver = new_receiver;
1439
1440 // Send the subscription
1441 LightstreamerClient::subscribe(subscription_sender, subscription).await?;
1442
1443 // Wait for the ID to be updated through the channel
1444 match id_receiver.recv().await {
1445 Some(id) => Ok(id),
1446 None => Err(LightstreamerError::invalid_state(
1447 "Failed to get subscription id",
1448 )),
1449 }
1450 }
1451
1452 /// Operation method that removes a `Subscription` that is currently in the "active" state.
1453 ///
1454 /// By bringing back a `Subscription` to the "inactive" state, the unsubscription from all its
1455 /// items is requested to Lightstreamer Server.
1456 ///
1457 /// Subscription can be unsubscribed from at any time. Once done the `Subscription` immediately
1458 /// exits the "active" state.
1459 ///
1460 /// Note that forwarding of the unsubscription to the server is made in a separate thread.
1461 ///
1462 /// The unsubscription will be notified through a `SubscriptionListener.onUnsubscription()` event.
1463 ///
1464 /// # Parameters
1465 ///
1466 /// * `subscription_sender`: A `Sender` object that sends a `SubscriptionRequest` to the `LightstreamerClient`
1467 /// * `subscription_id`: The id of the subscription to be unsubscribed from.
1468 /// instance.
1469 ///
1470 /// # Errors
1471 ///
1472 /// Returns an error if the subscription channel is closed.
1473 pub async fn unsubscribe(
1474 subscription_sender: Sender<SubscriptionRequest>,
1475 subscription_id: usize,
1476 ) -> Result<(), LightstreamerError> {
1477 subscription_sender
1478 .send(SubscriptionRequest {
1479 subscription: None,
1480 subscription_id: Some(subscription_id),
1481 })
1482 .await
1483 .map_err(|e| {
1484 LightstreamerError::channel(format!("Failed to send unsubscription: {}", e))
1485 })
1486 }
1487
1488 /// Method setting enum for the logging of this instance.
1489 ///
1490 /// Default logging type is StdLogs, corresponding to `stdout`
1491 ///
1492 /// `LightstreamerClient` has methods for logging that are compatible with the `Tracing` crate.
1493 /// Enabling logging for the `Tracing` crate requires implementation of a tracing subscriber
1494 /// and its configuration and formatting.
1495 ///
1496 /// # Parameters
1497 ///
1498 /// * `logging`: An enum declaring the logging type of this `LightstreamerClient` instance.
1499 pub fn set_logging_type(&mut self, logging: LogType) {
1500 self.logging = logging;
1501 }
1502
1503 /// Method for logging messages
1504 ///
1505 /// Match case wraps log types. `loglevel` param ignored in StdLogs case, all output to stdout.
1506 ///
1507 /// # Parameters
1508 ///
1509 /// * `loglevel` Enum determining use of stdout or Tracing subscriber.
1510 pub fn make_log(&mut self, loglevel: Level, log: &str) {
1511 match self.logging {
1512 LogType::StdLogs => {
1513 debug!("{}", log);
1514 }
1515 LogType::TracingLogs => match loglevel {
1516 Level::INFO => {
1517 info!(log);
1518 }
1519 Level::WARN => {
1520 warn!(log);
1521 }
1522 Level::ERROR => {
1523 error!(log);
1524 }
1525 Level::TRACE => {
1526 trace!(log);
1527 }
1528 Level::DEBUG => {
1529 debug!(log);
1530 }
1531 },
1532 }
1533 }
1534
1535 /// Enables automatic reconnection with default configuration.
1536 pub fn enable_auto_reconnect(&mut self) {
1537 self.auto_reconnect_enabled = true;
1538 // ConnectionManager will be created during connect() to avoid circular dependency
1539 }
1540
1541 /// Enables automatic reconnection with the specified configuration.
1542 ///
1543 /// # Parameters
1544 ///
1545 /// * `reconnection_config`: Configuration for reconnection behavior
1546 /// * `heartbeat_config`: Configuration for heartbeat monitoring
1547 pub fn enable_auto_reconnect_with_config(
1548 &mut self,
1549 reconnection_config: ReconnectionConfig,
1550 heartbeat_config: HeartbeatConfig,
1551 ) -> Result<(), LightstreamerError> {
1552 self.auto_reconnect_enabled = true;
1553 self.reconnection_config = reconnection_config;
1554 self.heartbeat_config = heartbeat_config;
1555 self.auto_reconnect_enabled = true;
1556 // ConnectionManager will be created during connect() to avoid circular dependency
1557 Ok(())
1558 }
1559
1560 /// Disables automatic reconnection.
1561 pub async fn disable_auto_reconnect(&mut self) {
1562 self.auto_reconnect_enabled = false;
1563 if let Some(manager) = &self.connection_manager {
1564 manager.shutdown().await;
1565 }
1566 self.connection_manager = None;
1567 }
1568
1569 /// Returns whether automatic reconnection is currently enabled.
1570 pub fn is_auto_reconnect_enabled(&self) -> bool {
1571 self.auto_reconnect_enabled
1572 }
1573
1574 /// Gets the current reconnection configuration.
1575 pub fn get_reconnection_config(&self) -> &ReconnectionConfig {
1576 &self.reconnection_config
1577 }
1578
1579 /// Gets the current heartbeat configuration.
1580 pub fn get_heartbeat_config(&self) -> &HeartbeatConfig {
1581 &self.heartbeat_config
1582 }
1583
1584 /// Updates the reconnection configuration.
1585 pub fn set_reconnection_config(&mut self, config: ReconnectionConfig) {
1586 self.reconnection_config = config;
1587 // Note: ConnectionManager doesn't support runtime config updates
1588 // The configuration is set during ConnectionManager creation
1589 }
1590
1591 /// Updates the heartbeat configuration.
1592 pub fn set_heartbeat_config(&mut self, config: HeartbeatConfig) {
1593 self.heartbeat_config = config;
1594 // Note: ConnectionManager doesn't support runtime config updates
1595 // The configuration is set during ConnectionManager creation
1596 }
1597
1598 /// Gets the current connection state from the connection manager.
1599 pub async fn get_connection_state(&self) -> ConnectionState {
1600 if let Some(manager) = &self.connection_manager {
1601 manager.get_connection_state().await
1602 } else {
1603 ConnectionState::Disconnected
1604 }
1605 }
1606
1607 /// Gets connection metrics from the connection manager.
1608 pub async fn get_connection_metrics(&self) -> crate::connection::management::ConnectionMetrics {
1609 if let Some(manager) = &self.connection_manager {
1610 manager.get_metrics().await
1611 } else {
1612 crate::connection::management::ConnectionMetrics::default()
1613 }
1614 }
1615
1616 /// Forces an immediate reconnection attempt if auto-reconnect is enabled.
1617 pub async fn force_reconnect(&mut self) -> Result<(), LightstreamerError> {
1618 if !self.auto_reconnect_enabled {
1619 return Err(LightstreamerError::invalid_state(
1620 "Auto-reconnect is not enabled",
1621 ));
1622 }
1623
1624 if let Some(manager) = &mut self.connection_manager {
1625 manager.force_reconnect().await?;
1626 }
1627
1628 Ok(())
1629 }
1630}
1631
1632/// Computes the effective liveness window after the server declares its
1633/// keepalive interval in the `conok` message.
1634///
1635/// Returns the server-declared value when no window was configured
1636/// (`current_ms == 0`), otherwise the larger of the two — the window may only
1637/// widen, never shrink, so a server declaring a longer keepalive cannot cause
1638/// false-positive disconnects.
1639#[must_use]
1640#[inline]
1641fn widen_keepalive_timeout(current_ms: u64, server_declared_ms: u64) -> u64 {
1642 if current_ms == 0 {
1643 server_declared_ms
1644 } else {
1645 current_ms.max(server_declared_ms)
1646 }
1647}
1648
1649#[cfg(test)]
1650mod tests {
1651 use super::*;
1652 use crate::subscription::{Subscription, SubscriptionListener, SubscriptionMode};
1653
1654 use std::fmt::Debug;
1655 use std::sync::{Arc, Mutex};
1656 use tokio::sync::Notify;
1657
1658 #[derive(Debug)]
1659 struct MockClientListener {
1660 property_changes: Arc<Mutex<Vec<String>>>,
1661 status_changes: Arc<Mutex<Vec<String>>>,
1662 server_errors: Arc<Mutex<Vec<(i32, String)>>>,
1663 }
1664
1665 impl MockClientListener {
1666 fn new() -> Self {
1667 MockClientListener {
1668 property_changes: Arc::new(Mutex::new(Vec::new())),
1669 status_changes: Arc::new(Mutex::new(Vec::new())),
1670 server_errors: Arc::new(Mutex::new(Vec::new())),
1671 }
1672 }
1673
1674 #[allow(dead_code)]
1675 fn with_shared_data(
1676 property_changes: Arc<Mutex<Vec<String>>>,
1677 status_changes: Arc<Mutex<Vec<String>>>,
1678 server_errors: Arc<Mutex<Vec<(i32, String)>>>,
1679 ) -> Self {
1680 MockClientListener {
1681 property_changes,
1682 status_changes,
1683 server_errors,
1684 }
1685 }
1686 }
1687
1688 impl ClientListener for MockClientListener {
1689 fn on_property_change(&self, property: &str) {
1690 if let Ok(mut guard) = self.property_changes.lock() {
1691 guard.push(property.to_string());
1692 }
1693 }
1694
1695 fn on_status_change(&self, status: &str) {
1696 if let Ok(mut guard) = self.status_changes.lock() {
1697 guard.push(status.to_string());
1698 }
1699 }
1700
1701 fn on_server_error(&self, code: i32, message: &str) {
1702 if let Ok(mut guard) = self.server_errors.lock() {
1703 guard.push((code, message.to_string()));
1704 }
1705 }
1706 }
1707
1708 #[allow(dead_code)]
1709 struct MockSubscriptionListener;
1710
1711 impl SubscriptionListener for MockSubscriptionListener {
1712 fn on_subscription(&mut self) {}
1713 fn on_unsubscription(&mut self) {}
1714 fn on_item_update(&self, _update: &ItemUpdate) {}
1715 }
1716
1717 impl Debug for MockSubscriptionListener {
1718 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1719 write!(f, "MockSubscriptionListener")
1720 }
1721 }
1722
1723 #[allow(dead_code)]
1724 struct LightstreamerClientTestContext {
1725 client: LightstreamerClient,
1726 property_changes: Arc<Mutex<Vec<String>>>,
1727 status_changes: Arc<Mutex<Vec<String>>>,
1728 server_errors: Arc<Mutex<Vec<(i32, String)>>>,
1729 }
1730
1731 impl LightstreamerClientTestContext {
1732 #[allow(dead_code)]
1733 fn new() -> Result<Self, LightstreamerError> {
1734 let property_changes = Arc::new(Mutex::new(Vec::new()));
1735 let status_changes = Arc::new(Mutex::new(Vec::new()));
1736 let server_errors = Arc::new(Mutex::new(Vec::new()));
1737 let listener = MockClientListener::with_shared_data(
1738 Arc::clone(&property_changes),
1739 Arc::clone(&status_changes),
1740 Arc::clone(&server_errors),
1741 );
1742
1743 let mut client = LightstreamerClient::new(
1744 Some("http://test.lightstreamer.com"),
1745 Some("DEMO"),
1746 None,
1747 None,
1748 )?;
1749 client.add_listener(Box::new(listener));
1750
1751 Ok(LightstreamerClientTestContext {
1752 client,
1753 property_changes,
1754 status_changes,
1755 server_errors,
1756 })
1757 }
1758 }
1759
1760 #[test]
1761 fn test_new_lightstreamer_client() -> Result<(), LightstreamerError> {
1762 let client = LightstreamerClient::new(
1763 Some("http://test.lightstreamer.com"),
1764 Some("DEMO"),
1765 None,
1766 None,
1767 )?;
1768 assert_eq!(
1769 client.server_address,
1770 Some("http://test.lightstreamer.com".to_string())
1771 );
1772 assert_eq!(client.adapter_set, Some("DEMO".to_string()));
1773 let client = LightstreamerClient::new(
1774 Some("http://test.lightstreamer.com"),
1775 Some("DEMO"),
1776 Some("user1"),
1777 Some("pass1"),
1778 )?;
1779 assert_eq!(
1780 client.connection_details.get_user(),
1781 Some(&"user1".to_string())
1782 );
1783 assert_eq!(
1784 client.connection_details.get_password(),
1785 Some(&"pass1".to_string())
1786 );
1787 let result = LightstreamerClient::new(Some("invalid-url"), Some("DEMO"), None, None);
1788 assert!(result.is_err());
1789 let client = LightstreamerClient::new(None, Some("DEMO"), None, None)?;
1790 assert_eq!(client.server_address, None);
1791 Ok(())
1792 }
1793
1794 #[test]
1795 fn test_add_listener() -> Result<(), LightstreamerError> {
1796 let mut client = LightstreamerClient::new(
1797 Some("http://test.lightstreamer.com"),
1798 Some("DEMO"),
1799 None,
1800 None,
1801 )?;
1802 assert_eq!(client.listeners.len(), 0);
1803 let listener = Box::new(MockClientListener::new());
1804 client.add_listener(listener);
1805 assert_eq!(client.listeners.len(), 1);
1806 let listener2 = Box::new(MockClientListener::new());
1807 client.add_listener(listener2);
1808 assert_eq!(client.listeners.len(), 2);
1809 Ok(())
1810 }
1811
1812 #[test]
1813 fn test_get_listeners() -> Result<(), LightstreamerError> {
1814 let mut client = LightstreamerClient::new(
1815 Some("http://test.lightstreamer.com"),
1816 Some("DEMO"),
1817 None,
1818 None,
1819 )?;
1820 assert_eq!(client.get_listeners().len(), 0);
1821
1822 let listener = Box::new(MockClientListener::new());
1823 client.add_listener(listener);
1824 assert_eq!(client.get_listeners().len(), 1);
1825 Ok(())
1826 }
1827
1828 #[test]
1829 fn test_get_status() -> Result<(), LightstreamerError> {
1830 let client = LightstreamerClient::new(
1831 Some("http://test.lightstreamer.com"),
1832 Some("DEMO"),
1833 None,
1834 None,
1835 )?;
1836 match client.get_status() {
1837 ClientStatus::Disconnected(DisconnectionType::WillRetry) => {}
1838 _ => panic!("Expected initial status to be DISCONNECTED:WILL-RETRY"),
1839 }
1840 Ok(())
1841 }
1842
1843 #[test]
1844 fn test_get_subscriptions() -> Result<(), LightstreamerError> {
1845 let client = LightstreamerClient::new(
1846 Some("http://test.lightstreamer.com"),
1847 Some("DEMO"),
1848 None,
1849 None,
1850 )?;
1851 assert_eq!(client.get_subscriptions().len(), 0);
1852 Ok(())
1853 }
1854
1855 #[tokio::test]
1856 async fn test_connect_with_no_server_address() -> Result<(), LightstreamerError> {
1857 let client = LightstreamerClient::new(None, Some("DEMO"), None, None)?;
1858 let client_arc = Arc::new(tokio::sync::Mutex::new(client));
1859 let shutdown_signal = Arc::new(Notify::new());
1860 let result = LightstreamerClient::connect(client_arc, shutdown_signal).await;
1861 assert!(result.is_err());
1862 Ok(())
1863 }
1864
1865 #[tokio::test]
1866 async fn test_forced_transport_validation() -> Result<(), LightstreamerError> {
1867 let mut client = LightstreamerClient::new(
1868 Some("http://test.lightstreamer.com"),
1869 Some("DEMO"),
1870 None,
1871 None,
1872 )?;
1873
1874 client.connection_options.set_forced_transport(None);
1875 let client_arc = Arc::new(tokio::sync::Mutex::new(client));
1876 let shutdown_signal = Arc::new(Notify::new());
1877 let result = LightstreamerClient::connect(client_arc.clone(), shutdown_signal).await;
1878 assert!(result.is_err());
1879 client_arc
1880 .lock()
1881 .await
1882 .connection_options
1883 .set_forced_transport(Some(Transport::WsStreaming));
1884 Ok(())
1885 }
1886
1887 #[test]
1888 fn test_subscription_params_generation() -> Result<(), LightstreamerError> {
1889 let subscription = Subscription::new(
1890 SubscriptionMode::Merge,
1891 Some(vec!["item1".to_string(), "item2".to_string()]),
1892 Some(vec!["field1".to_string(), "field2".to_string()]),
1893 )?;
1894
1895 let params_str = LightstreamerClient::get_subscription_params(&subscription, 1)?;
1896
1897 assert!(params_str.contains("LS_reqId=1"));
1898 assert!(params_str.contains("LS_op=add"));
1899 assert!(params_str.contains("LS_subId="));
1900 assert!(params_str.contains("LS_mode=MERGE"));
1901 assert!(params_str.contains("LS_group="));
1902 assert!(params_str.contains("LS_schema="));
1903 Ok(())
1904 }
1905
1906 #[test]
1907 fn test_unsubscription_params_generation() -> Result<(), LightstreamerError> {
1908 let params_str = LightstreamerClient::get_unsubscription_params(42, 123)?;
1909
1910 assert!(params_str.contains("LS_reqId=123"));
1911 assert!(params_str.contains("LS_op=delete"));
1912 assert!(params_str.contains("LS_subId=42"));
1913 Ok(())
1914 }
1915
1916 #[test]
1917 fn test_logging_functions() -> Result<(), LightstreamerError> {
1918 let mut client = LightstreamerClient::new(
1919 Some("http://test.lightstreamer.com"),
1920 Some("DEMO"),
1921 None,
1922 None,
1923 )?;
1924
1925 client.set_logging_type(LogType::StdLogs);
1926
1927 client.make_log(Level::INFO, "Test log message");
1928 client.make_log(Level::DEBUG, "Test debug message");
1929 client.set_logging_type(LogType::TracingLogs);
1930 client.make_log(Level::INFO, "Test tracing log message");
1931 client.make_log(Level::DEBUG, "Test tracing debug message");
1932 Ok(())
1933 }
1934
1935 #[test]
1936 fn test_debug_implementation() -> Result<(), LightstreamerError> {
1937 let client = LightstreamerClient::new(
1938 Some("http://test.lightstreamer.com"),
1939 Some("DEMO"),
1940 None,
1941 None,
1942 )?;
1943
1944 // Test that Debug implementation works without panicking
1945 let debug_string = format!("{:?}", client);
1946
1947 // Verify it contains expected fields
1948 assert!(debug_string.contains("server_address"));
1949 assert!(debug_string.contains("adapter_set"));
1950 assert!(debug_string.contains("connection_details"));
1951 assert!(debug_string.contains("connection_options"));
1952 assert!(debug_string.contains("listeners"));
1953 assert!(debug_string.contains("subscriptions"));
1954
1955 // Verify the values are included
1956 assert!(debug_string.contains("http://test.lightstreamer.com"));
1957 assert!(debug_string.contains("DEMO"));
1958 Ok(())
1959 }
1960
1961 #[test]
1962 #[should_panic(expected = "Implement mechanism to add cookies to LightstreamerClient")]
1963 fn test_add_cookies() {
1964 // Test the static method add_cookies
1965 let cookie = Cookie::new("test_cookie", "test_value");
1966 LightstreamerClient::add_cookies("http://test.lightstreamer.com", &cookie);
1967 }
1968
1969 #[test]
1970 #[should_panic(expected = "not implemented")]
1971 fn test_get_cookies() {
1972 // Test the static method get_cookies
1973 LightstreamerClient::get_cookies(Some("http://test.lightstreamer.com"));
1974 }
1975}
1976
1977#[cfg(test)]
1978mod liveness_tests {
1979 use super::widen_keepalive_timeout;
1980
1981 #[test]
1982 fn test_widen_keepalive_timeout_unconfigured_adopts_server_value() {
1983 assert_eq!(widen_keepalive_timeout(0, 5_000), 5_000);
1984 }
1985
1986 #[test]
1987 fn test_widen_keepalive_timeout_server_longer_widens() {
1988 assert_eq!(widen_keepalive_timeout(15_000, 30_000), 30_000);
1989 }
1990
1991 #[test]
1992 fn test_widen_keepalive_timeout_server_shorter_never_shrinks() {
1993 assert_eq!(widen_keepalive_timeout(15_000, 5_000), 15_000);
1994 }
1995
1996 #[test]
1997 fn test_widen_keepalive_timeout_equal_values_unchanged() {
1998 assert_eq!(widen_keepalive_timeout(15_000, 15_000), 15_000);
1999 }
2000}