lightstreamer_rs/client/implementation.rs
1use crate::subscription::{ItemUpdate, Snapshot, Subscription, SubscriptionMode};
2
3use crate::client::Transport;
4pub(crate) use crate::client::listener::ClientListener;
5use crate::client::message_listener::ClientMessageListener;
6use crate::client::model::{ClientStatus, DisconnectionType, LogType};
7use crate::client::request::SubscriptionRequest;
8use crate::client::utils::get_subscription_by_id;
9use crate::connection::{ConnectionDetails, ConnectionOptions};
10use crate::connection::{ConnectionManager, ConnectionState, HeartbeatConfig, ReconnectionConfig};
11use crate::utils::{IllegalStateException, clean_message, parse_arguments};
12use cookie::Cookie;
13use futures_util::{SinkExt, StreamExt};
14use std::collections::HashMap;
15use std::error::Error;
16use std::fmt::{self, Debug, Formatter};
17use std::sync::Arc;
18use tokio::sync::mpsc::channel;
19use tokio::sync::{
20 Mutex, Notify,
21 mpsc::{Receiver, Sender},
22};
23use tokio_tungstenite::{
24 connect_async,
25 tungstenite::{
26 Message,
27 http::{HeaderName, HeaderValue, Request},
28 },
29};
30use tracing::{Level, debug, error, info, instrument, trace, warn};
31use url::Url;
32
33/// Facade class for the management of the communication to Lightstreamer Server. Used to provide
34/// configuration settings, event handlers, operations for the control of the connection lifecycle,
35/// Subscription handling and to send messages.
36///
37/// An instance of `LightstreamerClient` handles the communication with Lightstreamer Server on a
38/// specified endpoint. Hence, it hosts one "Session"; or, more precisely, a sequence of Sessions,
39/// since any Session may fail and be recovered, or it can be interrupted on purpose. So, normally,
40/// a single instance of `LightstreamerClient` is needed.
41///
42/// However, multiple instances of `LightstreamerClient` can be used, toward the same or multiple
43/// endpoints.
44///
45/// You can listen to the events generated by a session by registering an event listener, such as
46/// `ClientListener` or `SubscriptionListener`. These listeners allow you to handle various events,
47/// such as session creation, connection status, subscription updates, and server messages. However,
48/// you should be aware that the event notifications are dispatched by a single thread, the so-called
49/// event thread. This means that if the operations of a listener are slow or blocking, they will
50/// delay the processing of the other listeners and affect the performance of your application.
51/// Therefore, you should delegate any slow or blocking operations to a dedicated thread, and keep
52/// the listener methods as fast and simple as possible. Note that even if you create multiple
53/// instances of `LightstreamerClient`, they will all use a single event thread, that is shared
54/// among them.
55///
56/// # Parameters
57///
58/// * `server_address`: the address of the Lightstreamer Server to which this `LightstreamerClient`
59/// will connect to. It is possible to specify it later by using `None` here. See
60/// `ConnectionDetails.setServerAddress()` for details.
61/// * `adapter_set`: the name of the Adapter Set mounted on Lightstreamer Server to be used to handle
62/// all requests in the Session associated with this `LightstreamerClient`. It is possible not to
63/// specify it at all or to specify it later by using `None` here. See `ConnectionDetails.setAdapterSet()`
64/// for details.
65///
66/// # Raises
67///
68/// * `IllegalArgumentException`: if a not valid address is passed. See `ConnectionDetails.setServerAddress()`
69/// for details.
70pub struct LightstreamerClient {
71 /// The address of the Lightstreamer Server to which this `LightstreamerClient` will connect.
72 server_address: Option<String>,
73 /// The name of the Adapter Set mounted on Lightstreamer Server to be used to handle all
74 /// requests in the Session associated with this `LightstreamerClient`.
75 adapter_set: Option<String>,
76 /// Data object that contains the details needed to open a connection to a Lightstreamer Server.
77 /// This instance is set up by the `LightstreamerClient` object at its own creation. Properties
78 /// of this object can be overwritten by values received from a Lightstreamer Server.
79 pub connection_details: ConnectionDetails,
80 /// Data object that contains options and policies for the connection to the server. This instance
81 /// is set up by the `LightstreamerClient` object at its own creation. Properties of this object
82 /// can be overwritten by values received from a Lightstreamer Server.
83 pub connection_options: ConnectionOptions,
84 /// A list of listeners that will receive events from the `LightstreamerClient` instance.
85 listeners: Vec<Box<dyn ClientListener>>,
86 /// A list containing all the `Subscription` instances that are currently "active" on this
87 /// `LightstreamerClient`.
88 subscriptions: Vec<Subscription>,
89 /// The current status of the client.
90 status: ClientStatus,
91 /// Logging Type to be used
92 logging: LogType,
93 /// The sender that can be used to subscribe/unsubscribe
94 pub subscription_sender: Sender<SubscriptionRequest>,
95 /// The receiver used for subscribe/unsubsribe
96 subscription_receiver: Receiver<SubscriptionRequest>,
97 /// Connection manager for automatic reconnection
98 connection_manager: Option<Arc<ConnectionManager>>,
99 /// Configuration for reconnection behavior
100 reconnection_config: ReconnectionConfig,
101 /// Configuration for heartbeat monitoring
102 heartbeat_config: HeartbeatConfig,
103 /// Whether automatic reconnection is enabled
104 auto_reconnect_enabled: bool,
105}
106
107impl Debug for LightstreamerClient {
108 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
109 f.debug_struct("LightstreamerClient")
110 .field("server_address", &self.server_address)
111 .field("adapter_set", &self.adapter_set)
112 .field("connection_details", &self.connection_details)
113 .field("connection_options", &self.connection_options)
114 .field("listeners", &self.listeners)
115 .field("subscriptions", &self.subscriptions)
116 .finish()
117 }
118}
119
120impl LightstreamerClient {
121 /// A constant string representing the name of the library.
122 pub const LIB_NAME: &'static str = "rust_client";
123
124 /// A constant string representing the version of the library.
125 pub const LIB_VERSION: &'static str = "0.1.0";
126
127 //
128 // Constants for WebSocket connection.
129 //
130 /// WebSocket key used in the WebSocket handshake process.
131 /// This is a base64-encoded value that is used to establish the WebSocket connection.
132 pub const SEC_WEBSOCKET_KEY: &'static str = "PNDUibe9ex7PnsrLbt0N4w==";
133
134 /// WebSocket protocol identifier for the TLCP protocol used by Lightstreamer.
135 /// This identifies the specific subprotocol used over the WebSocket connection.
136 pub const SEC_WEBSOCKET_PROTOCOL: &'static str = "TLCP-2.4.0.lightstreamer.com";
137
138 /// WebSocket version used for the connection.
139 /// Version 13 is the standard version used in modern WebSocket implementations.
140 pub const SEC_WEBSOCKET_VERSION: &'static str = "13";
141
142 /// WebSocket upgrade header value.
143 /// Used to indicate that the client wishes to upgrade from HTTP to WebSocket protocol.
144 pub const SEC_WEBSOCKET_UPGRADE: &'static str = "websocket";
145
146 /// A constant string representing the version of the TLCP protocol used by the library.
147 pub const TLCP_VERSION: &'static str = "TLCP-2.4.0";
148
149 /// Static method that can be used to share cookies between connections to the Server (performed by
150 /// this library) and connections to other sites that are performed by the application. With this
151 /// method, cookies received by the application can be added (or replaced if already present) to
152 /// the cookie set used by the library to access the Server. Obviously, only cookies whose domain
153 /// is compatible with the Server domain will be used internally.
154 ///
155 /// This method should be invoked before calling the `LightstreamerClient.connect()` method.
156 /// However it can be invoked at any time; it will affect the internal cookie set immediately
157 /// and the sending of cookies on the next HTTP request or WebSocket establishment.
158 ///
159 /// # Parameters
160 ///
161 /// * `uri`: the URI from which the supplied cookies were received. It cannot be `None`.
162 /// * `cookies`: an instance of `http.cookies.SimpleCookie`.
163 ///
164 /// See also `getCookies()`
165 pub fn add_cookies(_uri: &str, _cookies: &Cookie) {
166 // Implementation for add_cookies
167 unimplemented!("Implement mechanism to add cookies to LightstreamerClient");
168 }
169
170 /// Adds a listener that will receive events from the `LightstreamerClient` instance.
171 ///
172 /// The same listener can be added to several different `LightstreamerClient` instances.
173 ///
174 /// A listener can be added at any time. A call to add a listener already present will be ignored.
175 ///
176 /// # Parameters
177 ///
178 /// * `listener`: An object that will receive the events as documented in the `ClientListener`
179 /// interface.
180 ///
181 /// See also `removeListener()`
182 pub fn add_listener(&mut self, listener: Box<dyn ClientListener>) {
183 self.listeners.push(listener);
184 }
185
186 /// Packs s string with the necessary parameters for a subscription request.
187 ///
188 /// # Parameters
189 ///
190 /// * `subscription`: The subscription for which to get the parameters.
191 /// * `request_id`: The request ID to use in the parameters.
192 ///
193 fn get_subscription_params(
194 subscription: &Subscription,
195 request_id: usize,
196 ) -> Result<String, Box<dyn Error + Send + Sync>> {
197 let ls_req_id = request_id.to_string();
198 let ls_sub_id = subscription.id.to_string();
199 let ls_mode = subscription.get_mode().to_string();
200 let ls_group = match subscription.get_item_group() {
201 Some(item_group) => item_group.to_string(),
202 None => match subscription.get_items() {
203 Some(items) => items.join(" "),
204 None => {
205 return Err(Box::new(std::io::Error::new(
206 std::io::ErrorKind::InvalidData,
207 "No item group or items found in subscription.",
208 )));
209 }
210 },
211 };
212 let ls_schema = match subscription.get_field_schema() {
213 Some(field_schema) => field_schema.to_string(),
214 None => match subscription.get_fields() {
215 Some(fields) => fields.join(" "),
216 None => {
217 return Err(Box::new(std::io::Error::new(
218 std::io::ErrorKind::InvalidData,
219 "No field schema or fields found in subscription.",
220 )));
221 }
222 },
223 };
224 let ls_data_adapter = match subscription.get_data_adapter() {
225 Some(data_adapter) => data_adapter.to_string(),
226 None => "".to_string(),
227 };
228 let ls_snapshot = subscription
229 .get_requested_snapshot()
230 .unwrap_or_default()
231 .to_string();
232 //
233 // Prepare the subscription request.
234 //
235 let mut params: Vec<(&str, &str)> = vec![
236 ("LS_data_adapter", &ls_data_adapter),
237 ("LS_reqId", &ls_req_id),
238 ("LS_op", "add"),
239 ("LS_subId", &ls_sub_id),
240 ("LS_mode", &ls_mode),
241 ("LS_group", &ls_group),
242 ("LS_schema", &ls_schema),
243 ("LS_ack", "false"),
244 ];
245 // Remove the data adapter parameter if not specified.
246 if ls_data_adapter.is_empty() {
247 params.remove(0);
248 }
249 if !ls_snapshot.is_empty() {
250 params.push(("LS_snapshot", &ls_snapshot));
251 }
252
253 Ok(serde_urlencoded::to_string(¶ms)?)
254 }
255
256 fn get_unsubscription_params(
257 subscription_id: usize,
258 request_id: usize,
259 ) -> Result<String, Box<dyn Error + Send + Sync>> {
260 let ls_req_id = request_id.to_string();
261 let ls_sub_id = subscription_id.to_string();
262 //
263 // Prepare the unsubscription request.
264 //
265 let params: Vec<(&str, &str)> = vec![
266 ("LS_reqId", &ls_req_id),
267 ("LS_op", "delete"),
268 ("LS_subId", &ls_sub_id),
269 ];
270
271 Ok(serde_urlencoded::to_string(¶ms)?)
272 }
273
274 /// Operation method that requests to open a Session against the configured Lightstreamer Server.
275 ///
276 /// When `connect()` is called, unless a single transport was forced through `ConnectionOptions.setForcedTransport()`,
277 /// the so called "Stream-Sense" mechanism is started: if the client does not receive any answer
278 /// for some seconds from the streaming connection, then it will automatically open a polling
279 /// connection.
280 ///
281 /// A polling connection may also be opened if the environment is not suitable for a streaming
282 /// connection.
283 ///
284 /// Note that as "polling connection" we mean a loop of polling requests, each of which requires
285 /// opening a synchronous (i.e. not streaming) connection to Lightstreamer Server.
286 ///
287 /// Note that the request to connect is accomplished by the client in a separate thread; this
288 /// means that an invocation to `getStatus()` right after `connect()` might not reflect the change
289 /// yet.
290 ///
291 /// When the request to connect is finally being executed, if the current status of the client
292 /// is not `DISCONNECTED`, then nothing will be done.
293 ///
294 /// # Raises
295 ///
296 /// * `IllegalStateException`: if no server address was configured.
297 ///
298 /// See also `getStatus()`
299 ///
300 /// See also `disconnect()`
301 ///
302 /// See also `ClientListener.onStatusChange()`
303 ///
304 /// See also `ConnectionDetails.setServerAddress()`
305 #[instrument(level = "trace")]
306 pub async fn connect(
307 client: Arc<Mutex<Self>>,
308 shutdown_signal: Arc<Notify>,
309 ) -> Result<(), Box<dyn Error + Send + Sync>> {
310 // If auto-reconnect is enabled, use ConnectionManager
311 let auto_reconnect_enabled = {
312 let client_guard = client.lock().await;
313 client_guard.auto_reconnect_enabled
314 };
315
316 if auto_reconnect_enabled {
317 let (reconnection_config, heartbeat_config) = {
318 let client_guard = client.lock().await;
319 (
320 client_guard.reconnection_config.clone(),
321 client_guard.heartbeat_config.clone(),
322 )
323 };
324
325 let weak_client = Arc::downgrade(&client);
326 let connection_manager: Arc<ConnectionManager> =
327 ConnectionManager::new(weak_client, reconnection_config, heartbeat_config);
328
329 {
330 let mut client_guard = client.lock().await;
331 client_guard.connection_manager = Some(connection_manager.clone());
332 }
333
334 // Use direct connection method
335 return client.lock().await.connect_direct(shutdown_signal).await;
336 }
337
338 // Fallback to direct connection without auto-reconnect
339 let mut client_guard = client.lock().await;
340 client_guard.connect_direct(shutdown_signal).await
341 }
342
343 /// Direct connection method without automatic reconnection
344 #[instrument(level = "trace")]
345 pub async fn connect_direct(
346 &mut self,
347 shutdown_signal: Arc<Notify>,
348 ) -> Result<(), Box<dyn Error + Send + Sync>> {
349 // Check if the server address is configured.
350 if self.server_address.is_none() {
351 return Err(Box::new(IllegalStateException::new(
352 "No server address was configured.",
353 )));
354 }
355 //
356 // Only WebSocket streaming transport is currently supported.
357 //
358 let forced_transport = self.connection_options.get_forced_transport();
359 if forced_transport.is_none()
360 || *forced_transport.unwrap() /* unwrap() is safe here */ != Transport::WsStreaming
361 {
362 return Err(Box::new(IllegalStateException::new(
363 "Only WebSocket streaming transport is currently supported.",
364 )));
365 }
366 //
367 // Convert the HTTP URL to a WebSocket URL.
368 //
369 let http_url = self.connection_details.get_server_address().unwrap(); // unwrap() is safe here.
370 let mut url = Url::parse(http_url)
371 .expect("Failed to parse server address URL from connection details.");
372 match url.scheme() {
373 "http" => url
374 .set_scheme("ws")
375 .expect("Failed to set scheme to ws for WebSocket URL."),
376 "https" => url
377 .set_scheme("wss")
378 .expect("Failed to set scheme to wss for WebSocket URL."),
379 invalid_scheme => {
380 return Err(Box::new(IllegalStateException::new(&format!(
381 "Unsupported scheme '{}' found when converting HTTP URL to WebSocket URL.",
382 invalid_scheme
383 ))));
384 }
385 }
386 let ws_url = url.as_str();
387
388 // Build the WebSocket request with the necessary headers.
389 let request = Request::builder()
390 .uri(ws_url)
391 .header(
392 HeaderName::from_static("connection"),
393 HeaderValue::from_static("Upgrade"),
394 )
395 .header(
396 HeaderName::from_static("host"),
397 HeaderValue::from_str(url.host_str().unwrap_or("localhost")).map_err(|err| {
398 IllegalStateException::new(&format!(
399 "Invalid header value for header with name 'host': {}",
400 err
401 ))
402 })?,
403 )
404 .header(
405 HeaderName::from_static("sec-websocket-key"),
406 HeaderValue::from_static(Self::SEC_WEBSOCKET_KEY),
407 )
408 .header(
409 HeaderName::from_static("sec-websocket-protocol"),
410 HeaderValue::from_static(Self::SEC_WEBSOCKET_PROTOCOL),
411 )
412 .header(
413 HeaderName::from_static("sec-websocket-version"),
414 HeaderValue::from_static(Self::SEC_WEBSOCKET_VERSION),
415 )
416 .header(
417 HeaderName::from_static("upgrade"),
418 HeaderValue::from_static(Self::SEC_WEBSOCKET_UPGRADE),
419 )
420 .body(())?;
421
422 // Connect to the Lightstreamer server using WebSocket.
423 let ws_stream = match connect_async(request).await {
424 Ok((ws_stream, response)) => {
425 if let Some(server_header) = response.headers().get("server") {
426 self.make_log(
427 Level::INFO,
428 &format!(
429 "Connected to Lightstreamer server: {}",
430 server_header.to_str().unwrap_or("")
431 ),
432 );
433 } else {
434 self.make_log(Level::INFO, "Connected to Lightstreamer server");
435 }
436 ws_stream
437 }
438 Err(err) => {
439 return Err(Box::new(std::io::Error::new(
440 std::io::ErrorKind::ConnectionRefused,
441 format!(
442 "Failed to connect to Lightstreamer server with WebSocket: {}",
443 err
444 ),
445 )));
446 }
447 };
448
449 // Split the WebSocket stream into a write and a read stream.
450 let (mut write_stream, mut read_stream) = ws_stream.split();
451
452 //
453 // Initiate communication with the server by sending a 'wsok' message.
454 //
455 write_stream.send(Message::Text("wsok".into())).await?;
456
457 //
458 // Start reading and processing messages from the server.
459 //
460 let mut is_connected = false;
461 let mut request_id: usize = 0;
462 let mut _session_id: Option<String> = None;
463 let mut subscription_id: usize = 0;
464 let mut subscription_item_updates: HashMap<usize, HashMap<usize, ItemUpdate>> =
465 HashMap::new();
466 loop {
467 tokio::select! {
468 message = read_stream.next() => {
469 match message {
470 Some(Ok(Message::Text(text))) => {
471 // Messages could include multiple submessages separated by /r/n.
472 // Split the message into submessages and process each one separately.
473 let submessages: Vec<&str> = text.split("\r\n")
474 .filter(|&line| !line.trim().is_empty()) // Filter out empty lines.
475 .collect();
476 for submessage in submessages {
477 let clean_text = clean_message(submessage);
478 let submessage_fields: Vec<&str> = clean_text.split(",").collect();
479 match *submessage_fields.first().unwrap_or(&"") {
480 //
481 // Errors from server.
482 //
483 "conerr" | "reqerr" => {
484 self.make_log( Level::ERROR, &format!("Received connection error from Lightstreamer server: {}", clean_text) );
485 break;
486 },
487 //
488 // Session created successfully.
489 //
490 "conok" => {
491 is_connected = true;
492 if let Some(session_id) = submessage_fields.get(1) {
493 self.make_log( Level::DEBUG, &format!("Session creation confirmed by server: {}", clean_text) );
494 self.make_log( Level::DEBUG, &format!("Session created with ID: {:?}", session_id) );
495 //
496 // Subscribe to the desired items.
497 //
498 while let Some(subscription) = self.subscriptions.get_mut(subscription_id) {
499 //
500 // Gather all the necessary subscription parameters.
501 //
502 subscription_id += 1;
503 request_id += 1;
504 subscription.id = subscription_id;
505 subscription.id_sender.try_send(subscription_id)?;
506
507 let encoded_params = match Self::get_subscription_params(subscription, request_id)
508 {
509 Ok(params) => params,
510 Err(err) => {
511 return Err(err);
512 },
513 };
514
515 write_stream
516 .send(Message::Text(format!("control\r\n{}", encoded_params).into()))
517 .await?;
518 debug!("Sent subscription request: '{}'", encoded_params);
519 }
520 } else {
521 return Err(Box::new(std::io::Error::new(
522 std::io::ErrorKind::InvalidData,
523 "Session ID not found in 'conok' message from server",
524 )));
525 }
526 },
527 //
528 // Notifications from server.
529 //
530 "conf" | "cons" | "clientip" | "servname" | "prog" | "sync" | "eos" => {
531 self.make_log( Level::INFO, &format!("Received notification from server: {}", clean_text) );
532 // Don't do anything with these notifications for now.
533 },
534 "probe" => {
535 self.make_log( Level::DEBUG, &format!("Received probe message from server: {}", clean_text ) );
536 },
537 "reqok" => {
538 self.make_log( Level::DEBUG, &format!("Received reqok message from server: '{}'", clean_text ) );
539 },
540 //
541 // Subscription confirmation from server.
542 //
543 "subok" => {
544 self.make_log( Level::INFO, &format!("Subscription confirmed by server: '{}'", clean_text) );
545 },
546 //
547 // Usubscription confirmation from server.
548 //
549 "unsub" => {
550 self.make_log( Level::INFO, &format!("Unsubscription confirmed by server: '{}'", clean_text) );
551 },
552 //
553 // Data updates from server.
554 //
555 "u" => {
556 // Parse arguments from the received message.
557 let arguments = parse_arguments(&clean_text);
558 //
559 // Extract the subscription from the first argument.
560 //
561 let subscription_index = arguments.get(1).unwrap_or(&"").parse::<usize>().unwrap_or(0);
562 let subscription = match get_subscription_by_id(self.get_subscriptions(), subscription_index) {
563 Some(subscription) => subscription,
564 None => {
565 self.make_log( Level::WARN, &format!("Subscription not found for index: {}", subscription_index) );
566 continue;
567
568 }
569 };
570 //
571 // Extract the item from the second argument.
572 //
573 let item_index = arguments.get(2).unwrap_or(&"").parse::<usize>().unwrap_or(0);
574 let item = match subscription.get_items() {
575 Some(items) => items.get(item_index-1),
576 None => {
577 self.make_log( Level::WARN, &format!("No items found in subscription: {:?}", subscription) );
578 continue;
579 }
580 };
581 //
582 // Determine if the update is a snapshot or real-time update based on the subscription parameters.
583 //
584 let is_snapshot = match subscription.get_requested_snapshot() {
585 Some(ls_snapshot) => {
586 match ls_snapshot {
587 Snapshot::No => false,
588 Snapshot::Yes => {
589 match subscription.get_mode() {
590 SubscriptionMode::Merge => {
591 if arguments.len() == 4 && arguments[3] == "$" {
592 // EOS notification received
593 true
594 } else {
595 // If item doesn't exist in item_updates yet, the first update
596 // is always a snapshot.
597 if let Some(item_updates) = subscription_item_updates.get(&(subscription_index)) {
598 if item_updates.get(&(item_index)).is_some() {
599 // Item update already exists in item_updates, so it's not a snapshot.
600 false
601 } else {
602 // Item update doesn't exist in item_updates, so the first update is always a snapshot.
603 true
604 }
605 } else {
606 // Item updates not found for subscription, so the first update is always a snapshot.
607 true
608 }
609 }
610 },
611 SubscriptionMode::Distinct | SubscriptionMode::Command => {
612 !subscription.is_subscribed()
613 },
614 _ => false,
615 }
616 },
617 _ => false,
618 }
619 },
620 None => false,
621 };
622
623 // Extract the field values from the third argument.
624 let field_values: Vec<&str> = arguments.get(3).unwrap_or(&"").split('|').collect();
625
626 //
627 // Get fields from subscription and create a HashMap of field names and values.
628 //
629 let subscription_fields = subscription.get_fields();
630 let mut field_map: HashMap<String, Option<String>> = subscription_fields
631 .map(|fields| fields.iter().map(|field_name| (field_name.to_string(), None)).collect())
632 .unwrap_or_default();
633
634 let mut field_index = 0;
635 for value in field_values {
636 match value {
637 "" => {
638 // An empty value means the field is unchanged compared to the previous update of the same field.
639 if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
640 field_map.insert(field_name.to_string(), None);
641 }
642 field_index += 1;
643 }
644 "#" | "$" => {
645 // A value corresponding to a hash sign "#" or dollar sign "$" means the field is null or empty.
646 if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
647 field_map.insert(field_name.to_string(), Some("".to_string()));
648 }
649 field_index += 1;
650 }
651 value if value.starts_with('^') => {
652 let command = value.chars().nth(1).unwrap_or(' ');
653 match command {
654 '0'..='9' => {
655 let count = value[1..].parse().unwrap_or(0);
656 for i in 0..count {
657 if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index + i)) {
658 field_map.insert(field_name.to_string(), None);
659 }
660 }
661 field_index += count;
662 }
663 'P' | 'T' => {
664 let diff_value = serde_urlencoded::from_str(&value[2..]).unwrap_or_else(|_| value[2..].to_string());
665 if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index))
666 && let Some(prev_value) = field_map.get(field_name).and_then(|v| v.as_ref()) {
667 let new_value = match command {
668 'P' => {
669 // Apply JSON Patch
670 let patch: serde_json::Value = serde_json::from_str(&diff_value).unwrap_or(serde_json::Value::Null);
671 let mut prev_json: serde_json::Value = serde_json::from_str(prev_value).unwrap_or(serde_json::Value::Null);
672 let patch_operations: Vec<json_patch::PatchOperation> = serde_json::from_value(patch).unwrap_or_default();
673 let _ = json_patch::patch(&mut prev_json, &patch_operations);
674 prev_json.to_string()
675 }
676 'T' => {
677 // Apply TLCP-diff
678 //tlcp_diff::apply_diff(prev_value, &diff_value).unwrap_or_else(|_| prev_value.to_string())
679 unimplemented!("Implement TLCP-diff");
680 }
681 _ => unreachable!(),
682 };
683 field_map.insert(field_name.to_string(), Some(new_value.to_string()));
684 }
685 field_index += 1;
686 }
687 _ => {
688 let decoded_value = serde_urlencoded::from_str(value).unwrap_or_else(|_| value.to_string());
689 if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
690 field_map.insert(field_name.to_string(), Some(decoded_value));
691 }
692 field_index += 1;
693 }
694 }
695 }
696 value if value.starts_with('{') => {
697 // in this case it is a json payload that we will let the consumer handle. In this case, it is important
698 // to preserve casing for parsing.
699 let original_json = parse_arguments(submessage).get(3).unwrap_or(&"").split('|').collect::<Vec<&str>>();
700 let mut payload = "";
701 for json in original_json.iter()
702 {
703 if json.is_empty() || *json == "#"
704 {
705 continue;
706 }
707
708 payload = json;
709 }
710
711 if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
712 field_map.insert(field_name.to_string(), Some(payload.to_string()));
713 }
714 field_index += 1;
715 }
716 _ => {
717 let decoded_value = serde_urlencoded::from_str(value).unwrap_or_else(|_| value.to_string());
718 if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
719 field_map.insert(field_name.to_string(), Some(decoded_value));
720 }
721 field_index += 1;
722 }
723 }
724 }
725
726 // Store only item_update's changed fields.
727 let changed_fields: HashMap<String, String> = field_map.iter()
728 .filter_map(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone())))
729 .collect();
730
731 //
732 // Take the proper item_update from item_updates and update it with changed fields.
733 // If the item_update doesn't exist yet, create a new one.
734 //
735 let current_item_update: ItemUpdate;
736 match subscription_item_updates.get_mut(&(subscription_index)) {
737 Some(item_updates) => match item_updates.get_mut(&(item_index)) {
738 Some(item_update) => {
739 //
740 // Iterate changed_fields and update existing item_update.fields assigning the new values.
741 //
742 for (field_name, new_value) in &changed_fields {
743 if item_update.fields.contains_key(field_name) {
744 item_update.fields.insert((*field_name).clone(), Some(new_value.clone()));
745 }
746 }
747 item_update.changed_fields = changed_fields.clone();
748 item_update.is_snapshot = is_snapshot;
749 current_item_update = item_update.clone();
750 },
751 None => {
752 // Create a new item_update and add it to item_updates.
753 let item_update = ItemUpdate {
754 item_name: item.cloned(),
755 item_pos: item_index,
756 fields: field_map.clone(),
757 changed_fields: changed_fields.clone(),
758 is_snapshot,
759 };
760 current_item_update = item_update.clone();
761 item_updates.insert(item_index, item_update);
762 }
763 },
764 None => {
765 // Create a new item_update and add it to item_updates.
766 let item_update = ItemUpdate {
767 item_name: item.cloned(),
768 item_pos: item_index,
769 fields: field_map,
770 changed_fields,
771 is_snapshot,
772 };
773 current_item_update = item_update.clone();
774 let mut item_updates = HashMap::new();
775 item_updates.insert(item_index, item_update);
776 subscription_item_updates.insert(subscription_index, item_updates);
777 }
778 };
779
780 // Get mutable subscription listeners directly.
781 let subscription_listeners = subscription.get_listeners();
782
783 // Iterate subscription listeners and call on_item_update for each listener.
784 for listener in subscription_listeners {
785 listener.on_item_update(¤t_item_update);
786 }
787 }
788 //
789 // Connection confirmation from server.
790 //
791 "wsok" => {
792 self.make_log( Level::INFO, &format!("Connection confirmed by server: '{}'", clean_text) );
793 //
794 // Request session creation.
795 //
796 let ls_adapter_set = match self.connection_details.get_adapter_set() {
797 Some(adapter_set) => adapter_set,
798 None => {
799 return Err(Box::new(IllegalStateException::new(
800 "No adapter set found in connection details.",
801 )));
802 },
803 };
804 let ls_send_sync = self.connection_options.get_send_sync().to_string();
805 let mut params: Vec<(&str, &str)> = vec![
806 ("LS_adapter_set", ls_adapter_set),
807 ("LS_cid", "mgQkwtwdysogQz2BJ4Ji kOj2Bg"),
808 ("LS_send_sync", &ls_send_sync),
809 ];
810 if let Some(user) = &self.connection_details.get_user() {
811 params.push(("LS_user", user));
812 }
813 if let Some(password) = &self.connection_details.get_password() {
814 params.push(("LS_password", password));
815 }
816 params.push(("LS_protocol", Self::TLCP_VERSION));
817 let encoded_params = serde_urlencoded::to_string(¶ms)?;
818 write_stream
819 .send(Message::Text(format!("create_session\r\n{}\n", encoded_params).into()))
820 .await?;
821 self.make_log( Level::DEBUG, &format!("Sent create session request: '{}'", encoded_params) );
822 },
823 unexpected_message => {
824 return Err(Box::new(std::io::Error::new(
825 std::io::ErrorKind::InvalidData,
826 format!(
827 "Unexpected message received from server: '{:?}'",
828 unexpected_message
829 ),
830 )));
831 },
832 }
833 }
834 },
835 Some(Ok(non_text_message)) => {
836 return Err(Box::new(std::io::Error::new(
837 std::io::ErrorKind::InvalidData,
838 format!(
839 "Unexpected non-text message from server: {:?}",
840 non_text_message
841 ),
842 )));
843 },
844 Some(Err(err)) => {
845 return Err(Box::new(std::io::Error::new(
846 std::io::ErrorKind::InvalidData,
847 format!("Error reading message from server: {}", err),
848 )));
849 },
850 None => {
851 self.make_log( Level::DEBUG, "No more messages from server" );
852 break;
853 },
854 }
855 },
856 Some(subscription_request) = self.subscription_receiver.recv() => {
857 request_id += 1;
858 // Process subscription requests.
859 if subscription_request.subscription.is_some()
860 {
861 self.subscriptions.push(subscription_request.subscription.unwrap());
862
863 // if we are not connected yet, we will subscribe later
864 if !is_connected {
865 continue;
866 }
867
868 subscription_id += 1;
869 self.subscriptions.last_mut().unwrap().id = subscription_id;
870 self.subscriptions.last().unwrap().id_sender.try_send(subscription_id)?;
871
872 let encoded_params = match Self::get_subscription_params(self.subscriptions.last().unwrap(), request_id)
873 {
874 Ok(params) => params,
875 Err(err) => {
876 return Err(err);
877 },
878 };
879
880 write_stream
881 .send(Message::Text(format!("control\r\n{}", encoded_params).into()))
882 .await?;
883
884 self.make_log( Level::INFO, &format!("Sent subscription request: '{}'", encoded_params) );
885 }
886 // Process unsubscription requests.
887 else if subscription_request.subscription_id.is_some()
888 {
889 let unsubscription_id = subscription_request.subscription_id.unwrap();
890 let encoded_params = match Self::get_unsubscription_params(unsubscription_id, request_id)
891 {
892 Ok(params) => params,
893 Err(err) => {
894 return Err(err);
895 },
896 };
897
898 write_stream
899 .send(Message::Text(format!("control\r\n{}", encoded_params).into()))
900 .await?;
901
902 self.make_log( Level::INFO, &format!("Sent unsubscription request: '{}'", encoded_params) );
903
904 self.subscriptions.retain(|s| s.id != unsubscription_id);
905
906 if self.subscriptions.is_empty()
907 {
908 self.make_log( Level::INFO, "No more subscriptions, disconnecting" );
909 shutdown_signal.notify_one();
910 }
911 }
912 },
913 _ = shutdown_signal.notified() => {
914 self.make_log( Level::INFO, "Received shutdown signal" );
915 break;
916 },
917 }
918 }
919
920 Ok(())
921 }
922
923 /// Operation method that requests to close the Session opened against the configured Lightstreamer
924 /// Server (if any).
925 ///
926 /// When `disconnect()` is called, the "Stream-Sense" mechanism is stopped.
927 ///
928 /// Note that active `Subscription` instances, associated with this `LightstreamerClient` instance,
929 /// are preserved to be re-subscribed to on future Sessions.
930 ///
931 /// Note that the request to disconnect is accomplished by the client in a separate thread; this
932 /// means that an invocation to `getStatus()` right after `disconnect()` might not reflect the
933 /// change yet.
934 ///
935 /// When the request to disconnect is finally being executed, if the status of the client is
936 /// "DISCONNECTED", then nothing will be done.
937 ///
938 /// See also `connect()`
939 #[instrument(level = "trace")]
940 pub async fn disconnect(&mut self) {
941 self.make_log(Level::INFO, "Disconnecting from Lightstreamer server");
942
943 // If auto-reconnect is enabled and we have a connection manager, stop it
944 if self.auto_reconnect_enabled {
945 if let Some(ref manager) = self.connection_manager {
946 manager.shutdown().await;
947 }
948 self.connection_manager = None;
949 }
950
951 // Update status to disconnected
952 self.status = ClientStatus::Disconnected(DisconnectionType::WillRetry);
953
954 // Notify listeners about status change
955 for listener in &self.listeners {
956 listener.on_status_change(&self.status.to_string());
957 }
958 }
959
960 /// Static inquiry method that can be used to share cookies between connections to the Server
961 /// (performed by this library) and connections to other sites that are performed by the application.
962 /// With this method, cookies received from the Server can be extracted for sending through other
963 /// connections, according with the URI to be accessed.
964 ///
965 /// See `addCookies()` for clarifications on when cookies are directly stored by the library and
966 /// when not.
967 ///
968 /// # Parameters
969 ///
970 /// * `uri`: the URI to which the cookies should be sent, or `None`.
971 ///
972 /// # Returns
973 ///
974 /// A list with the various cookies that can be sent in a HTTP request for the specified URI.
975 /// If a `None` URI was supplied, all available non-expired cookies will be returned.
976 pub fn get_cookies(_uri: Option<&str>) -> Cookie<'_> {
977 // Implementation for get_cookies
978 unimplemented!()
979 }
980
981 /// Returns a list containing the `ClientListener` instances that were added to this client.
982 ///
983 /// # Returns
984 ///
985 /// A list containing the listeners that were added to this client.
986 ///
987 /// See also `addListener()`
988 pub fn get_listeners(&self) -> &Vec<Box<dyn ClientListener>> {
989 &self.listeners
990 }
991
992 /// Inquiry method that gets the current client status and transport (when applicable).
993 ///
994 /// # Returns
995 ///
996 /// The current client status. It can be one of the following values:
997 ///
998 /// - `"CONNECTING"`: the client is waiting for a Server's response in order to establish a connection;
999 /// - `"CONNECTED:STREAM-SENSING"`: the client has received a preliminary response from the server
1000 /// and is currently verifying if a streaming connection is possible;
1001 /// - `"CONNECTED:WS-STREAMING"`: a streaming connection over WebSocket is active;
1002 /// - `"CONNECTED:HTTP-STREAMING"`: a streaming connection over HTTP is active;
1003 /// - `"CONNECTED:WS-POLLING"`: a polling connection over WebSocket is in progress;
1004 /// - `"CONNECTED:HTTP-POLLING"`: a polling connection over HTTP is in progress;
1005 /// - `"STALLED"`: the Server has not been sending data on an active streaming connection for
1006 /// longer than a configured time;
1007 /// - `"DISCONNECTED:WILL-RETRY"`: no connection is currently active but one will be opened
1008 /// (possibly after a timeout);
1009 /// - `"DISCONNECTED:TRYING-RECOVERY"`: no connection is currently active, but one will be opened
1010 /// as soon as possible, as an attempt to recover the current session after a connection issue;
1011 /// - `"DISCONNECTED"`: no connection is currently active.
1012 ///
1013 /// See also `ClientListener.onStatusChange()`
1014 pub fn get_status(&self) -> &ClientStatus {
1015 &self.status
1016 }
1017
1018 /// Inquiry method that returns a list containing all the `Subscription` instances that are
1019 /// currently "active" on this `LightstreamerClient`.
1020 ///
1021 /// Internal second-level `Subscription` are not included.
1022 ///
1023 /// # Returns
1024 ///
1025 /// A list, containing all the `Subscription` currently "active" on this `LightstreamerClient`.
1026 /// The list can be empty.
1027 ///
1028 /// See also `subscribe()`
1029 pub fn get_subscriptions(&self) -> &Vec<Subscription> {
1030 &self.subscriptions
1031 }
1032
1033 /// Creates a new instance of `LightstreamerClient`.
1034 ///
1035 /// The constructor initializes the client with the server address and adapter set, if provided.
1036 /// It sets up the connection details and options for the client. If no server address or
1037 /// adapter set is specified, those properties on the client will be `None`. This allows
1038 /// for late configuration of these details before connecting to the Lightstreamer server.
1039 ///
1040 /// # Arguments
1041 /// * `server_address` - An optional reference to a string slice that represents the server
1042 /// address to connect to. If `None`, the server address must be set later.
1043 /// * `adapter_set` - An optional reference to a string slice that specifies the adapter set name.
1044 /// If `None`, the adapter set must be set later.
1045 ///
1046 /// # Returns
1047 /// A result containing the new `LightstreamerClient` instance if successful, or an
1048 /// `IllegalStateException` if the initialization fails due to invalid state conditions.
1049 ///
1050 /// # Panics
1051 /// Does not panic under normal circumstances. However, unexpected internal errors during
1052 /// the creation of internal components could cause panics, which should be considered when
1053 /// using this function in production code.
1054 ///
1055 pub fn new(
1056 server_address: Option<&str>,
1057 adapter_set: Option<&str>,
1058 username: Option<&str>,
1059 password: Option<&str>,
1060 ) -> Result<LightstreamerClient, Box<dyn Error>> {
1061 let connection_details =
1062 ConnectionDetails::new(server_address, adapter_set, username, password)?;
1063 let connection_options = ConnectionOptions::default();
1064 let (subscription_sender, subscription_receiver) = channel(100);
1065
1066 Ok(LightstreamerClient {
1067 server_address: server_address.map(|s| s.to_string()),
1068 adapter_set: adapter_set.map(|s| s.to_string()),
1069 connection_details,
1070 connection_options,
1071 listeners: Vec::new(),
1072 subscriptions: Vec::new(),
1073 status: ClientStatus::Disconnected(DisconnectionType::WillRetry),
1074 logging: LogType::StdLogs,
1075 subscription_sender,
1076 subscription_receiver,
1077 connection_manager: None,
1078 reconnection_config: ReconnectionConfig::default(),
1079 heartbeat_config: HeartbeatConfig::default(),
1080 auto_reconnect_enabled: false,
1081 })
1082 }
1083
1084 /// Removes a listener from the `LightstreamerClient` instance so that it will not receive
1085 /// events anymore.
1086 ///
1087 /// A listener can be removed at any time.
1088 ///
1089 /// # Parameters
1090 ///
1091 /// * `listener`: The listener to be removed.
1092 ///
1093 /// See also `addListener()`
1094 pub fn remove_listener(&mut self, _listener: Box<dyn ClientListener>) {
1095 unimplemented!("Implement mechanism to remove listener from LightstreamerClient");
1096 //self.listeners.remove(&listener);
1097 }
1098
1099 /// Operation method that sends a message to the Server. The message is interpreted and handled
1100 /// by the Metadata Adapter associated to the current Session. This operation supports in-order
1101 /// guaranteed message delivery with automatic batching. In other words, messages are guaranteed
1102 /// to arrive exactly once and respecting the original order, whatever is the underlying transport
1103 /// (HTTP or WebSockets). Furthermore, high frequency messages are automatically batched, if
1104 /// necessary, to reduce network round trips.
1105 ///
1106 /// Upon subsequent calls to the method, the sequential management of the involved messages is
1107 /// guaranteed. The ordering is determined by the order in which the calls to `sendMessage` are
1108 /// issued.
1109 ///
1110 /// If a message, for any reason, doesn't reach the Server (this is possible with the HTTP transport),
1111 /// it will be resent; however, this may cause the subsequent messages to be delayed. For this
1112 /// reason, each message can specify a "delayTimeout", which is the longest time the message,
1113 /// after reaching the Server, can be kept waiting if one of more preceding messages haven't
1114 /// been received yet. If the "delayTimeout" expires, these preceding messages will be discarded;
1115 /// any discarded message will be notified to the listener through `ClientMessageListener.onDiscarded()`.
1116 /// Note that, because of the parallel transport of the messages, if a zero or very low timeout
1117 /// is set for a message and the previous message was sent immediately before, it is possible
1118 /// that the latter gets discarded even if no communication issues occur. The Server may also
1119 /// enforce its own timeout on missing messages, to prevent keeping the subsequent messages for
1120 /// long time.
1121 ///
1122 /// Sequence identifiers can also be associated with the messages. In this case, the sequential
1123 /// management is restricted to all subsets of messages with the same sequence identifier associated.
1124 ///
1125 /// Notifications of the operation outcome can be received by supplying a suitable listener. The
1126 /// supplied listener is guaranteed to be eventually invoked; listeners associated with a sequence
1127 /// are guaranteed to be invoked sequentially.
1128 ///
1129 /// The "UNORDERED_MESSAGES" sequence name has a special meaning. For such a sequence, immediate
1130 /// processing is guaranteed, while strict ordering and even sequentialization of the processing
1131 /// is not enforced. Likewise, strict ordering of the notifications is not enforced. However,
1132 /// messages that, for any reason, should fail to reach the Server whereas subsequent messages
1133 /// had succeeded, might still be discarded after a server-side timeout, in order to ensure that
1134 /// the listener eventually gets a notification.
1135 ///
1136 /// Moreover, if "UNORDERED_MESSAGES" is used and no listener is supplied, a "fire and forget"
1137 /// scenario is assumed. In this case, no checks on missing, duplicated or overtaken messages
1138 /// are performed at all, so as to optimize the processing and allow the highest possible throughput.
1139 ///
1140 /// Since a message is handled by the Metadata Adapter associated to the current connection, a
1141 /// message can be sent only if a connection is currently active. If the special `enqueueWhileDisconnected`
1142 /// flag is specified it is possible to call the method at any time and the client will take
1143 /// care of sending the message as soon as a connection is available, otherwise, if the current
1144 /// status is "DISCONNECTED*", the message will be abandoned and the `ClientMessageListener.onAbort()`
1145 /// event will be fired.
1146 ///
1147 /// Note that, in any case, as soon as the status switches again to "DISCONNECTED*", any message
1148 /// still pending is aborted, including messages that were queued with the `enqueueWhileDisconnected`
1149 /// flag set to `true`.
1150 ///
1151 /// Also note that forwarding of the message to the server is made in a separate thread, hence,
1152 /// if a message is sent while the connection is active, it could be aborted because of a subsequent
1153 /// disconnection. In the same way a message sent while the connection is not active might be
1154 /// sent because of a subsequent connection.
1155 ///
1156 /// # Parameters
1157 ///
1158 /// * `message`: a text message, whose interpretation is entirely demanded to the Metadata Adapter
1159 /// associated to the current connection.
1160 /// * `sequence`: an alphanumeric identifier, used to identify a subset of messages to be managed
1161 /// in sequence; underscore characters are also allowed. If the "UNORDERED_MESSAGES" identifier
1162 /// is supplied, the message will be processed in the special way described above. The parameter
1163 /// is optional; if set to `None`, "UNORDERED_MESSAGES" is used as the sequence name.
1164 /// * `delay_timeout`: a timeout, expressed in milliseconds. If higher than the Server configured
1165 /// timeout on missing messages, the latter will be used instead. The parameter is optional; if
1166 /// a negative value is supplied, the Server configured timeout on missing messages will be applied.
1167 /// This timeout is ignored for the special "UNORDERED_MESSAGES" sequence, although a server-side
1168 /// timeout on missing messages still applies.
1169 /// * `listener`: an object suitable for receiving notifications about the processing outcome. The
1170 /// parameter is optional; if not supplied, no notification will be available.
1171 /// * `enqueue_while_disconnected`: if this flag is set to `true`, and the client is in a disconnected
1172 /// status when the provided message is handled, then the message is not aborted right away but
1173 /// is queued waiting for a new session. Note that the message can still be aborted later when
1174 /// a new session is established.
1175 pub fn send_message(
1176 &mut self,
1177 message: &str,
1178 sequence: Option<&str>,
1179 _delay_timeout: Option<u64>,
1180 listener: Option<Box<dyn ClientMessageListener>>,
1181 enqueue_while_disconnected: bool,
1182 ) {
1183 let _sequence = sequence.unwrap_or("UNORDERED_MESSAGES");
1184
1185 // Handle the message based on the current connection status
1186 match &self.status {
1187 ClientStatus::Connected(_connection_type) => {
1188 // Send the message to the server in a separate thread
1189 // ...
1190 }
1191 ClientStatus::Disconnected(_disconnection_type) => {
1192 if enqueue_while_disconnected {
1193 // Enqueue the message to be sent when a connection is available
1194 // ...
1195 } else {
1196 // Abort the message and notify the listener
1197 if let Some(listener) = listener {
1198 listener.on_abort(message, false);
1199 }
1200 }
1201 }
1202 _ => {
1203 // Enqueue the message to be sent when a connection is available
1204 // ...
1205 }
1206 }
1207 unimplemented!("Complete mechanism to send message to LightstreamerClient.");
1208 }
1209
1210 /// Static method that permits to configure the logging system used by the library. The logging
1211 /// system must respect the `LoggerProvider` interface. A custom class can be used to wrap any
1212 /// third-party logging system.
1213 ///
1214 /// If no logging system is specified, all the generated log is discarded.
1215 ///
1216 /// The following categories are available to be consumed:
1217 ///
1218 /// - `lightstreamer.stream`: logs socket activity on Lightstreamer Server connections; at INFO
1219 /// level, socket operations are logged; at DEBUG level, read/write data exchange is logged.
1220 /// - `lightstreamer.protocol`: logs requests to Lightstreamer Server and Server answers; at INFO
1221 /// level, requests are logged; at DEBUG level, request details and events from the Server are logged.
1222 /// - `lightstreamer.session`: logs Server Session lifecycle events; at INFO level, lifecycle events
1223 /// are logged; at DEBUG level, lifecycle event details are logged.
1224 /// - `lightstreamer.subscriptions`: logs subscription requests received by the clients and the related
1225 /// updates; at WARN level, alert events from the Server are logged; at INFO level, subscriptions
1226 /// and unsubscriptions are logged; at DEBUG level, requests batching and update details are logged.
1227 /// - `lightstreamer.actions`: logs settings / API calls.
1228 ///
1229 /// # Parameters
1230 ///
1231 /// * `provider`: A `LoggerProvider` instance that will be used to generate log messages by the
1232 /// library classes.
1233 pub fn set_logger_provider() {
1234 unimplemented!("Implement mechanism to set logger provider for LightstreamerClient.");
1235 }
1236 /*
1237 pub fn set_logger_provider(provider: LoggerProvider) {
1238 // Implementation for set_logger_provider
1239 }
1240 */
1241
1242 /// Provides a mean to control the way TLS certificates are evaluated, with the possibility to
1243 /// accept untrusted ones.
1244 ///
1245 /// May be called only once before creating any `LightstreamerClient` instance.
1246 ///
1247 /// # Parameters
1248 ///
1249 /// * `factory`: an instance of `ssl.SSLContext`
1250 ///
1251 /// # Raises
1252 ///
1253 /// * `IllegalArgumentException`: if the factory is `None`
1254 /// * `IllegalStateException`: if a factory is already installed
1255 pub fn set_trust_manager_factory() {
1256 unimplemented!("Implement mechanism to set trust manager factory for LightstreamerClient.");
1257 }
1258 /*
1259 pub fn set_trust_manager_factory(factory: Option<SslContext>) -> Result<(), IllegalArgumentException> {
1260 if factory.is_none() {
1261 return Err(IllegalArgumentException::new(
1262 "Factory cannot be None",
1263 ));
1264 }
1265
1266 // Implementation for set_trust_manager_factory
1267 Ok(())
1268 }
1269 */
1270
1271 /// Adds a subscription to the `LightstreamerClient` instance.
1272 ///
1273 /// Active subscriptions are subscribed to through the server as soon as possible (i.e. as soon
1274 /// as there is a session available). Active `Subscription` are automatically persisted across different
1275 /// sessions as long as a related unsubscribe call is not issued.
1276 ///
1277 /// Subscriptions can be given to the `LightstreamerClient` at any time. Once done the `Subscription`
1278 /// immediately enters the "active" state.
1279 ///
1280 /// Once "active", a `Subscription` instance cannot be provided again to a `LightstreamerClient`
1281 /// unless it is first removed from the "active" state through a call to `unsubscribe()`.
1282 ///
1283 /// Also note that forwarding of the subscription to the server is made in a separate thread.
1284 ///
1285 /// A successful subscription to the server will be notified through a `SubscriptionListener.onSubscription()`
1286 /// event.
1287 ///
1288 /// # Parameters
1289 ///
1290 /// * `subsrciption_sender`: A `Sender` object that sends a `SubscriptionRequest` to the `LightstreamerClient`
1291 /// * `subscription`: A `Subscription` object, carrying all the information needed to process real-time
1292 /// values.
1293 ///
1294 /// See also `unsubscribe()`
1295 pub async fn subscribe(
1296 subscription_sender: Sender<SubscriptionRequest>,
1297 subscription: Subscription,
1298 ) {
1299 subscription_sender
1300 .send(SubscriptionRequest {
1301 subscription: Some(subscription),
1302 subscription_id: None,
1303 })
1304 .await
1305 .unwrap()
1306 }
1307
1308 /// If you want to be able to unsubscribe from a subscription, you need to keep track of the id
1309 /// of the subscription. This blocking method allows you to wait for the id of the subscription
1310 /// to be returned.
1311 ///
1312 /// # Parameters
1313 ///
1314 /// * `subscription_sender`: A `Sender` object that sends a `SubscriptionRequest` to the `LightstreamerClient`
1315 /// * `subscription`: A `Subscription` object, carrying all the information needed to process real-time
1316 ///
1317 pub async fn subscribe_get_id(
1318 subscription_sender: Sender<SubscriptionRequest>,
1319 mut subscription: Subscription,
1320 ) -> Result<usize, Box<dyn Error + Send + Sync>> {
1321 // Extract the id_receiver before sending the subscription
1322 let mut id_receiver = subscription.id_receiver;
1323
1324 // Create a new channel for the subscription we're about to send
1325 let (_new_sender, new_receiver) = channel(1);
1326 subscription.id_receiver = new_receiver;
1327
1328 // Send the subscription
1329 LightstreamerClient::subscribe(subscription_sender, subscription).await;
1330
1331 // Wait for the ID to be updated through the channel
1332 match id_receiver.recv().await {
1333 Some(id) => Ok(id),
1334 None => Err(Box::new(IllegalStateException::new(
1335 "Failed to get subscription id",
1336 ))),
1337 }
1338 }
1339
1340 /*
1341
1342 pub async fn subscribe_get_id(
1343 subscription_sender: Sender<SubscriptionRequest>,
1344 subscription: Subscription,
1345 ) -> Result<usize, Box<dyn Error + Send + Sync>> {
1346 let mut id_receiver = subscription.id_receiver.clone();
1347 LightstreamerClient::subscribe(subscription_sender.clone(), subscription);
1348
1349 match id_receiver.changed().await {
1350 Ok(_) => Ok(*id_receiver.borrow()),
1351 Err(_) => Err(Box::new(IllegalStateException::new(
1352 "Failed to get subscription id",
1353 ))),
1354 }
1355 }
1356 */
1357
1358 /// Operation method that removes a `Subscription` that is currently in the "active" state.
1359 ///
1360 /// By bringing back a `Subscription` to the "inactive" state, the unsubscription from all its
1361 /// items is requested to Lightstreamer Server.
1362 ///
1363 /// Subscription can be unsubscribed from at any time. Once done the `Subscription` immediately
1364 /// exits the "active" state.
1365 ///
1366 /// Note that forwarding of the unsubscription to the server is made in a separate thread.
1367 ///
1368 /// The unsubscription will be notified through a `SubscriptionListener.onUnsubscription()` event.
1369 ///
1370 /// # Parameters
1371 ///
1372 /// * `subsrciption_sender`: A `Sender` object that sends a `SubscriptionRequest` to the `LightstreamerClient`
1373 /// * `subscription_id`: The id of the subscription to be unsubscribed from.
1374 /// instance.
1375 pub async fn unsubscribe(
1376 subscription_sender: Sender<SubscriptionRequest>,
1377 subscription_id: usize,
1378 ) {
1379 subscription_sender
1380 .send(SubscriptionRequest {
1381 subscription: None,
1382 subscription_id: Some(subscription_id),
1383 })
1384 .await
1385 .unwrap()
1386 }
1387
1388 /// Method setting enum for the logging of this instance.
1389 ///
1390 /// Default logging type is StdLogs, corresponding to `stdout`
1391 ///
1392 /// `LightstreamerClient` has methods for logging that are compatible with the `Tracing` crate.
1393 /// Enabling logging for the `Tracing` crate requires implementation of a tracing subscriber
1394 /// and its configuration and formatting.
1395 ///
1396 /// # Parameters
1397 ///
1398 /// * `logging`: An enum declaring the logging type of this `LightstreamerClient` instance.
1399 pub fn set_logging_type(&mut self, logging: LogType) {
1400 self.logging = logging;
1401 }
1402
1403 /// Method for logging messages
1404 ///
1405 /// Match case wraps log types. `loglevel` param ignored in StdLogs case, all output to stdout.
1406 ///
1407 /// # Parameters
1408 ///
1409 /// * `loglevel` Enum determining use of stdout or Tracing subscriber.
1410 pub fn make_log(&mut self, loglevel: Level, log: &str) {
1411 match self.logging {
1412 LogType::StdLogs => {
1413 debug!("{}", log);
1414 }
1415 LogType::TracingLogs => match loglevel {
1416 Level::INFO => {
1417 info!(log);
1418 }
1419 Level::WARN => {
1420 warn!(log);
1421 }
1422 Level::ERROR => {
1423 error!(log);
1424 }
1425 Level::TRACE => {
1426 trace!(log);
1427 }
1428 Level::DEBUG => {
1429 debug!(log);
1430 }
1431 },
1432 }
1433 }
1434
1435 /// Enables automatic reconnection with default configuration.
1436 pub fn enable_auto_reconnect(&mut self) {
1437 self.auto_reconnect_enabled = true;
1438 // ConnectionManager will be created during connect() to avoid circular dependency
1439 }
1440
1441 /// Enables automatic reconnection with the specified configuration.
1442 ///
1443 /// # Parameters
1444 ///
1445 /// * `reconnection_config`: Configuration for reconnection behavior
1446 /// * `heartbeat_config`: Configuration for heartbeat monitoring
1447 pub fn enable_auto_reconnect_with_config(
1448 &mut self,
1449 reconnection_config: ReconnectionConfig,
1450 heartbeat_config: HeartbeatConfig,
1451 ) -> Result<(), Box<dyn Error>> {
1452 self.reconnection_config = reconnection_config;
1453 self.heartbeat_config = heartbeat_config;
1454 self.auto_reconnect_enabled = true;
1455 // ConnectionManager will be created during connect() to avoid circular dependency
1456 Ok(())
1457 }
1458
1459 /// Disables automatic reconnection.
1460 pub async fn disable_auto_reconnect(&mut self) {
1461 self.auto_reconnect_enabled = false;
1462 if let Some(manager) = &self.connection_manager {
1463 manager.shutdown().await;
1464 }
1465 self.connection_manager = None;
1466 }
1467
1468 /// Returns whether automatic reconnection is currently enabled.
1469 pub fn is_auto_reconnect_enabled(&self) -> bool {
1470 self.auto_reconnect_enabled
1471 }
1472
1473 /// Gets the current reconnection configuration.
1474 pub fn get_reconnection_config(&self) -> &ReconnectionConfig {
1475 &self.reconnection_config
1476 }
1477
1478 /// Gets the current heartbeat configuration.
1479 pub fn get_heartbeat_config(&self) -> &HeartbeatConfig {
1480 &self.heartbeat_config
1481 }
1482
1483 /// Updates the reconnection configuration.
1484 pub fn set_reconnection_config(&mut self, config: ReconnectionConfig) {
1485 self.reconnection_config = config;
1486 // Note: ConnectionManager doesn't support runtime config updates
1487 // The configuration is set during ConnectionManager creation
1488 }
1489
1490 /// Updates the heartbeat configuration.
1491 pub fn set_heartbeat_config(&mut self, config: HeartbeatConfig) {
1492 self.heartbeat_config = config;
1493 // Note: ConnectionManager doesn't support runtime config updates
1494 // The configuration is set during ConnectionManager creation
1495 }
1496
1497 /// Gets the current connection state from the connection manager.
1498 pub async fn get_connection_state(&self) -> ConnectionState {
1499 if let Some(manager) = &self.connection_manager {
1500 manager.get_connection_state().await
1501 } else {
1502 ConnectionState::Disconnected
1503 }
1504 }
1505
1506 /// Gets connection metrics from the connection manager.
1507 pub async fn get_connection_metrics(&self) -> crate::connection::management::ConnectionMetrics {
1508 if let Some(manager) = &self.connection_manager {
1509 manager.get_metrics().await
1510 } else {
1511 crate::connection::management::ConnectionMetrics::default()
1512 }
1513 }
1514
1515 /// Forces an immediate reconnection attempt if auto-reconnect is enabled.
1516 pub async fn force_reconnect(&mut self) -> Result<(), Box<dyn Error>> {
1517 if !self.auto_reconnect_enabled {
1518 return Err("Auto-reconnect is not enabled".into());
1519 }
1520
1521 if let Some(manager) = &mut self.connection_manager {
1522 manager.force_reconnect().await?;
1523 }
1524
1525 Ok(())
1526 }
1527}
1528
1529#[cfg(test)]
1530mod tests {
1531 use super::*;
1532 use crate::subscription::{Subscription, SubscriptionListener, SubscriptionMode};
1533 use std::error::Error;
1534 use std::fmt::Debug;
1535 use std::sync::{Arc, Mutex};
1536 use tokio::sync::Notify;
1537
1538 #[derive(Debug)]
1539 struct MockClientListener {
1540 property_changes: Arc<Mutex<Vec<String>>>,
1541 status_changes: Arc<Mutex<Vec<String>>>,
1542 server_errors: Arc<Mutex<Vec<(i32, String)>>>,
1543 }
1544
1545 impl MockClientListener {
1546 fn new() -> Self {
1547 MockClientListener {
1548 property_changes: Arc::new(Mutex::new(Vec::new())),
1549 status_changes: Arc::new(Mutex::new(Vec::new())),
1550 server_errors: Arc::new(Mutex::new(Vec::new())),
1551 }
1552 }
1553
1554 #[allow(dead_code)]
1555 fn with_shared_data(
1556 property_changes: Arc<Mutex<Vec<String>>>,
1557 status_changes: Arc<Mutex<Vec<String>>>,
1558 server_errors: Arc<Mutex<Vec<(i32, String)>>>,
1559 ) -> Self {
1560 MockClientListener {
1561 property_changes,
1562 status_changes,
1563 server_errors,
1564 }
1565 }
1566 }
1567
1568 impl ClientListener for MockClientListener {
1569 fn on_property_change(&self, property: &str) {
1570 self.property_changes
1571 .lock()
1572 .unwrap()
1573 .push(property.to_string());
1574 }
1575
1576 fn on_status_change(&self, status: &str) {
1577 self.status_changes.lock().unwrap().push(status.to_string());
1578 }
1579
1580 fn on_server_error(&self, code: i32, message: &str) {
1581 self.server_errors
1582 .lock()
1583 .unwrap()
1584 .push((code, message.to_string()));
1585 }
1586 }
1587
1588 #[allow(dead_code)]
1589 struct MockSubscriptionListener;
1590
1591 impl SubscriptionListener for MockSubscriptionListener {
1592 fn on_subscription(&mut self) {}
1593 fn on_unsubscription(&mut self) {}
1594 fn on_item_update(&self, _update: &ItemUpdate) {}
1595 }
1596
1597 impl Debug for MockSubscriptionListener {
1598 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1599 write!(f, "MockSubscriptionListener")
1600 }
1601 }
1602
1603 #[allow(dead_code)]
1604 struct LightstreamerClientTestContext {
1605 client: LightstreamerClient,
1606 property_changes: Arc<Mutex<Vec<String>>>,
1607 status_changes: Arc<Mutex<Vec<String>>>,
1608 server_errors: Arc<Mutex<Vec<(i32, String)>>>,
1609 }
1610
1611 impl LightstreamerClientTestContext {
1612 #[allow(dead_code)]
1613 fn new() -> Result<Self, Box<dyn Error>> {
1614 let property_changes = Arc::new(Mutex::new(Vec::new()));
1615 let status_changes = Arc::new(Mutex::new(Vec::new()));
1616 let server_errors = Arc::new(Mutex::new(Vec::new()));
1617 let listener = MockClientListener::with_shared_data(
1618 Arc::clone(&property_changes),
1619 Arc::clone(&status_changes),
1620 Arc::clone(&server_errors),
1621 );
1622
1623 let mut client = LightstreamerClient::new(
1624 Some("http://test.lightstreamer.com"),
1625 Some("DEMO"),
1626 None,
1627 None,
1628 )?;
1629 client.add_listener(Box::new(listener));
1630
1631 Ok(LightstreamerClientTestContext {
1632 client,
1633 property_changes,
1634 status_changes,
1635 server_errors,
1636 })
1637 }
1638 }
1639
1640 #[test]
1641 fn test_new_lightstreamer_client() {
1642 let result = LightstreamerClient::new(
1643 Some("http://test.lightstreamer.com"),
1644 Some("DEMO"),
1645 None,
1646 None,
1647 );
1648 assert!(result.is_ok());
1649 let client = result.unwrap();
1650 assert_eq!(
1651 client.server_address,
1652 Some("http://test.lightstreamer.com".to_string())
1653 );
1654 assert_eq!(client.adapter_set, Some("DEMO".to_string()));
1655 let result = LightstreamerClient::new(
1656 Some("http://test.lightstreamer.com"),
1657 Some("DEMO"),
1658 Some("user1"),
1659 Some("pass1"),
1660 );
1661 assert!(result.is_ok());
1662 let client = result.unwrap();
1663 assert_eq!(
1664 client.connection_details.get_user(),
1665 Some(&"user1".to_string())
1666 );
1667 assert_eq!(
1668 client.connection_details.get_password(),
1669 Some(&"pass1".to_string())
1670 );
1671 let result = LightstreamerClient::new(Some("invalid-url"), Some("DEMO"), None, None);
1672 assert!(result.is_err());
1673 let result = LightstreamerClient::new(None, Some("DEMO"), None, None);
1674 assert!(result.is_ok());
1675 let client = result.unwrap();
1676 assert_eq!(client.server_address, None);
1677 }
1678
1679 #[test]
1680 fn test_add_listener() {
1681 let result = LightstreamerClient::new(
1682 Some("http://test.lightstreamer.com"),
1683 Some("DEMO"),
1684 None,
1685 None,
1686 );
1687 assert!(result.is_ok());
1688 let mut client = result.unwrap();
1689 assert_eq!(client.listeners.len(), 0);
1690 let listener = Box::new(MockClientListener::new());
1691 client.add_listener(listener);
1692 assert_eq!(client.listeners.len(), 1);
1693 let listener2 = Box::new(MockClientListener::new());
1694 client.add_listener(listener2);
1695 assert_eq!(client.listeners.len(), 2);
1696 }
1697
1698 #[test]
1699 fn test_get_listeners() {
1700 let result = LightstreamerClient::new(
1701 Some("http://test.lightstreamer.com"),
1702 Some("DEMO"),
1703 None,
1704 None,
1705 );
1706 assert!(result.is_ok());
1707 let mut client = result.unwrap();
1708 assert_eq!(client.get_listeners().len(), 0);
1709
1710 let listener = Box::new(MockClientListener::new());
1711 client.add_listener(listener);
1712 assert_eq!(client.get_listeners().len(), 1);
1713 }
1714
1715 #[test]
1716 fn test_get_status() {
1717 let result = LightstreamerClient::new(
1718 Some("http://test.lightstreamer.com"),
1719 Some("DEMO"),
1720 None,
1721 None,
1722 );
1723 assert!(result.is_ok());
1724 let client = result.unwrap();
1725 match client.get_status() {
1726 ClientStatus::Disconnected(DisconnectionType::WillRetry) => {}
1727 _ => panic!("Expected initial status to be DISCONNECTED:WILL-RETRY"),
1728 }
1729 }
1730
1731 #[test]
1732 fn test_get_subscriptions() {
1733 let result = LightstreamerClient::new(
1734 Some("http://test.lightstreamer.com"),
1735 Some("DEMO"),
1736 None,
1737 None,
1738 );
1739 assert!(result.is_ok());
1740 let client = result.unwrap();
1741 assert_eq!(client.get_subscriptions().len(), 0);
1742 }
1743
1744 #[tokio::test]
1745 async fn test_connect_with_no_server_address() {
1746 let result = LightstreamerClient::new(None, Some("DEMO"), None, None);
1747 assert!(result.is_ok());
1748 let client = result.unwrap();
1749 let client_arc = Arc::new(tokio::sync::Mutex::new(client));
1750 let shutdown_signal = Arc::new(Notify::new());
1751 let result = LightstreamerClient::connect(client_arc, shutdown_signal).await;
1752 assert!(result.is_err());
1753 }
1754
1755 #[tokio::test]
1756 async fn test_forced_transport_validation() {
1757 let result = LightstreamerClient::new(
1758 Some("http://test.lightstreamer.com"),
1759 Some("DEMO"),
1760 None,
1761 None,
1762 );
1763 assert!(result.is_ok());
1764 let mut client = result.unwrap();
1765
1766 client.connection_options.set_forced_transport(None);
1767 let client_arc = Arc::new(tokio::sync::Mutex::new(client));
1768 let shutdown_signal = Arc::new(Notify::new());
1769 let result = LightstreamerClient::connect(client_arc.clone(), shutdown_signal).await;
1770 assert!(result.is_err());
1771 client_arc
1772 .lock()
1773 .await
1774 .connection_options
1775 .set_forced_transport(Some(Transport::WsStreaming));
1776 }
1777
1778 #[test]
1779 fn test_subscription_params_generation() {
1780 let subscription = Subscription::new(
1781 SubscriptionMode::Merge,
1782 Some(vec!["item1".to_string(), "item2".to_string()]),
1783 Some(vec!["field1".to_string(), "field2".to_string()]),
1784 )
1785 .unwrap();
1786
1787 let params = LightstreamerClient::get_subscription_params(&subscription, 1);
1788 assert!(params.is_ok());
1789 let params_str = params.unwrap();
1790
1791 assert!(params_str.contains("LS_reqId=1"));
1792 assert!(params_str.contains("LS_op=add"));
1793 assert!(params_str.contains("LS_subId="));
1794 assert!(params_str.contains("LS_mode=MERGE"));
1795 assert!(params_str.contains("LS_group="));
1796 assert!(params_str.contains("LS_schema="));
1797 }
1798
1799 #[test]
1800 fn test_unsubscription_params_generation() {
1801 let params = LightstreamerClient::get_unsubscription_params(42, 123);
1802 assert!(params.is_ok());
1803 let params_str = params.unwrap();
1804
1805 assert!(params_str.contains("LS_reqId=123"));
1806 assert!(params_str.contains("LS_op=delete"));
1807 assert!(params_str.contains("LS_subId=42"));
1808 }
1809
1810 #[test]
1811 fn test_logging_functions() {
1812 let result = LightstreamerClient::new(
1813 Some("http://test.lightstreamer.com"),
1814 Some("DEMO"),
1815 None,
1816 None,
1817 );
1818 assert!(result.is_ok());
1819 let mut client = result.unwrap();
1820
1821 client.set_logging_type(LogType::StdLogs);
1822
1823 client.make_log(Level::INFO, "Test log message");
1824 client.make_log(Level::DEBUG, "Test debug message");
1825 client.set_logging_type(LogType::TracingLogs);
1826 client.make_log(Level::INFO, "Test tracing log message");
1827 client.make_log(Level::DEBUG, "Test tracing debug message");
1828 }
1829
1830 #[test]
1831 fn test_debug_implementation() {
1832 let result = LightstreamerClient::new(
1833 Some("http://test.lightstreamer.com"),
1834 Some("DEMO"),
1835 None,
1836 None,
1837 );
1838 assert!(result.is_ok());
1839 let client = result.unwrap();
1840
1841 // Test that Debug implementation works without panicking
1842 let debug_string = format!("{:?}", client);
1843
1844 // Verify it contains expected fields
1845 assert!(debug_string.contains("server_address"));
1846 assert!(debug_string.contains("adapter_set"));
1847 assert!(debug_string.contains("connection_details"));
1848 assert!(debug_string.contains("connection_options"));
1849 assert!(debug_string.contains("listeners"));
1850 assert!(debug_string.contains("subscriptions"));
1851
1852 // Verify the values are included
1853 assert!(debug_string.contains("http://test.lightstreamer.com"));
1854 assert!(debug_string.contains("DEMO"));
1855 }
1856
1857 #[test]
1858 #[should_panic(expected = "Implement mechanism to add cookies to LightstreamerClient")]
1859 fn test_add_cookies() {
1860 // Test the static method add_cookies
1861 let cookie = Cookie::new("test_cookie", "test_value");
1862 LightstreamerClient::add_cookies("http://test.lightstreamer.com", &cookie);
1863 }
1864
1865 #[test]
1866 #[should_panic(expected = "not implemented")]
1867 fn test_get_cookies() {
1868 // Test the static method get_cookies
1869 LightstreamerClient::get_cookies(Some("http://test.lightstreamer.com"));
1870 }
1871}