lightstreamer_client/ls_client.rs
1use crate::client_listener::ClientListener;
2use crate::client_message_listener::ClientMessageListener;
3use crate::connection_details::ConnectionDetails;
4use crate::connection_options::ConnectionOptions;
5use crate::error::IllegalStateException;
6use crate::item_update::ItemUpdate;
7use crate::subscription::{Snapshot, Subscription, SubscriptionMode};
8use crate::util::*;
9
10use cookie::Cookie;
11use futures_util::{SinkExt, StreamExt};
12use std::collections::HashMap;
13use std::error::Error;
14use std::fmt::{self, Debug, Formatter};
15use std::sync::Arc;
16use tokio::sync::{
17 mpsc::{self, Receiver, Sender},
18 Notify,
19};
20use tokio_tungstenite::{
21 connect_async,
22 tungstenite::{
23 http::{HeaderName, HeaderValue, Request},
24 Message,
25 },
26};
27use tracing::{debug, error, info, instrument, trace, warn, Level};
28use url::Url;
29
30/// Represents the current status of the `LightstreamerClient`.
31pub enum ClientStatus {
32 Connecting,
33 Connected(ConnectionType),
34 Stalled,
35 Disconnected(DisconnectionType),
36}
37
38pub enum ConnectionType {
39 HttpPolling,
40 HttpStreaming,
41 StreamSensing,
42 WsPolling,
43 WsStreaming,
44}
45
46pub enum DisconnectionType {
47 WillRetry,
48 TryingRecovery,
49}
50
51pub enum LogType {
52 TracingLogs,
53 StdLogs,
54}
55
56pub struct SubscriptionRequest {
57 subscription: Option<Subscription>,
58 subscription_id: Option<usize>,
59}
60
61/// Facade class for the management of the communication to Lightstreamer Server. Used to provide
62/// configuration settings, event handlers, operations for the control of the connection lifecycle,
63/// Subscription handling and to send messages.
64///
65/// An instance of `LightstreamerClient` handles the communication with Lightstreamer Server on a
66/// specified endpoint. Hence, it hosts one "Session"; or, more precisely, a sequence of Sessions,
67/// since any Session may fail and be recovered, or it can be interrupted on purpose. So, normally,
68/// a single instance of `LightstreamerClient` is needed.
69///
70/// However, multiple instances of `LightstreamerClient` can be used, toward the same or multiple
71/// endpoints.
72///
73/// You can listen to the events generated by a session by registering an event listener, such as
74/// `ClientListener` or `SubscriptionListener`. These listeners allow you to handle various events,
75/// such as session creation, connection status, subscription updates, and server messages. However,
76/// you should be aware that the event notifications are dispatched by a single thread, the so-called
77/// event thread. This means that if the operations of a listener are slow or blocking, they will
78/// delay the processing of the other listeners and affect the performance of your application.
79/// Therefore, you should delegate any slow or blocking operations to a dedicated thread, and keep
80/// the listener methods as fast and simple as possible. Note that even if you create multiple
81/// instances of `LightstreamerClient`, they will all use a single event thread, that is shared
82/// among them.
83///
84/// # Parameters
85///
86/// * `server_address`: the address of the Lightstreamer Server to which this `LightstreamerClient`
87/// will connect to. It is possible to specify it later by using `None` here. See
88/// `ConnectionDetails.setServerAddress()` for details.
89/// * `adapter_set`: the name of the Adapter Set mounted on Lightstreamer Server to be used to handle
90/// all requests in the Session associated with this `LightstreamerClient`. It is possible not to
91/// specify it at all or to specify it later by using `None` here. See `ConnectionDetails.setAdapterSet()`
92/// for details.
93///
94/// # Raises
95///
96/// * `IllegalArgumentException`: if a not valid address is passed. See `ConnectionDetails.setServerAddress()`
97/// for details.
98pub struct LightstreamerClient {
99 /// The address of the Lightstreamer Server to which this `LightstreamerClient` will connect.
100 server_address: Option<String>,
101 /// The name of the Adapter Set mounted on Lightstreamer Server to be used to handle all
102 /// requests in the Session associated with this `LightstreamerClient`.
103 adapter_set: Option<String>,
104 /// Data object that contains the details needed to open a connection to a Lightstreamer Server.
105 /// This instance is set up by the `LightstreamerClient` object at its own creation. Properties
106 /// of this object can be overwritten by values received from a Lightstreamer Server.
107 pub connection_details: ConnectionDetails,
108 /// Data object that contains options and policies for the connection to the server. This instance
109 /// is set up by the `LightstreamerClient` object at its own creation. Properties of this object
110 /// can be overwritten by values received from a Lightstreamer Server.
111 pub connection_options: ConnectionOptions,
112 /// A list of listeners that will receive events from the `LightstreamerClient` instance.
113 listeners: Vec<Box<dyn ClientListener>>,
114 /// A list containing all the `Subscription` instances that are currently "active" on this
115 /// `LightstreamerClient`.
116 subscriptions: Vec<Subscription>,
117 /// The current status of the client.
118 status: ClientStatus,
119 /// Logging Type to be used
120 logging: LogType,
121 /// The sender that can be used to subscribe/unsubscribe
122 pub subscription_sender: Sender<SubscriptionRequest>,
123 /// The receiver used for subscribe/unsubsribe
124 subscription_receiver: Receiver<SubscriptionRequest>,
125}
126
127/// Retrieve a reference to a subscription with the given `id`
128fn get_subscription_by_id(
129 subscriptions: &Vec<Subscription>,
130 subscription_id: usize,
131) -> Option<&Subscription> {
132 subscriptions
133 .iter()
134 .find(|sub| sub.id == subscription_id)
135}
136
137impl Debug for LightstreamerClient {
138 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
139 f.debug_struct("LightstreamerClient")
140 .field("server_address", &self.server_address)
141 .field("adapter_set", &self.adapter_set)
142 .field("connection_details", &self.connection_details)
143 .field("connection_options", &self.connection_options)
144 .field("listeners", &self.listeners)
145 .field("subscriptions", &self.subscriptions)
146 .finish()
147 }
148}
149
150impl LightstreamerClient {
151 /// A constant string representing the name of the library.
152 pub const LIB_NAME: &'static str = "rust_client";
153
154 /// A constant string representing the version of the library.
155 pub const LIB_VERSION: &'static str = "0.1.0";
156
157 //
158 // Constants for WebSocket connection.
159 //
160 pub const SEC_WEBSOCKET_KEY: &'static str = "PNDUibe9ex7PnsrLbt0N4w==";
161 pub const SEC_WEBSOCKET_PROTOCOL: &'static str = "TLCP-2.4.0.lightstreamer.com";
162 pub const SEC_WEBSOCKET_VERSION: &'static str = "13";
163 pub const SEC_WEBSOCKET_UPGRADE: &'static str = "websocket";
164
165 /// A constant string representing the version of the TLCP protocol used by the library.
166 pub const TLCP_VERSION: &'static str = "TLCP-2.4.0";
167
168 /// Static method that can be used to share cookies between connections to the Server (performed by
169 /// this library) and connections to other sites that are performed by the application. With this
170 /// method, cookies received by the application can be added (or replaced if already present) to
171 /// the cookie set used by the library to access the Server. Obviously, only cookies whose domain
172 /// is compatible with the Server domain will be used internally.
173 ///
174 /// This method should be invoked before calling the `LightstreamerClient.connect()` method.
175 /// However it can be invoked at any time; it will affect the internal cookie set immediately
176 /// and the sending of cookies on the next HTTP request or WebSocket establishment.
177 ///
178 /// # Parameters
179 ///
180 /// * `uri`: the URI from which the supplied cookies were received. It cannot be `None`.
181 /// * `cookies`: an instance of `http.cookies.SimpleCookie`.
182 ///
183 /// See also `getCookies()`
184 pub fn add_cookies(_uri: &str, _cookies: &Cookie) {
185 // Implementation for add_cookies
186 unimplemented!("Implement mechanism to add cookies to LightstreamerClient");
187 }
188
189 /// Adds a listener that will receive events from the `LightstreamerClient` instance.
190 ///
191 /// The same listener can be added to several different `LightstreamerClient` instances.
192 ///
193 /// A listener can be added at any time. A call to add a listener already present will be ignored.
194 ///
195 /// # Parameters
196 ///
197 /// * `listener`: An object that will receive the events as documented in the `ClientListener`
198 /// interface.
199 ///
200 /// See also `removeListener()`
201 pub fn add_listener(&mut self, listener: Box<dyn ClientListener>) {
202 self.listeners.push(listener);
203 }
204
205 /// Packs s string with the necessary parameters for a subscription request.
206 ///
207 /// # Parameters
208 ///
209 /// * `subscription`: The subscription for which to get the parameters.
210 /// * `request_id`: The request ID to use in the parameters.
211 ///
212 fn get_subscription_params(
213 subscription: &Subscription,
214 request_id: usize,
215 ) -> Result<String, Box<dyn Error + Send + Sync>> {
216 let ls_req_id = request_id.to_string();
217 let ls_sub_id = subscription.id.to_string();
218 let ls_mode = subscription.get_mode().to_string();
219 let ls_group = match subscription.get_item_group() {
220 Some(item_group) => item_group.to_string(),
221 None => match subscription.get_items() {
222 Some(items) => items.join(" "),
223 None => {
224 return Err(Box::new(std::io::Error::new(
225 std::io::ErrorKind::InvalidData,
226 "No item group or items found in subscription.",
227 )));
228 }
229 },
230 };
231 let ls_schema = match subscription.get_field_schema() {
232 Some(field_schema) => field_schema.to_string(),
233 None => match subscription.get_fields() {
234 Some(fields) => fields.join(" "),
235 None => {
236 return Err(Box::new(std::io::Error::new(
237 std::io::ErrorKind::InvalidData,
238 "No field schema or fields found in subscription.",
239 )));
240 }
241 },
242 };
243 let ls_data_adapter = match subscription.get_data_adapter() {
244 Some(data_adapter) => data_adapter.to_string(),
245 None => "".to_string(),
246 };
247 let ls_snapshot = subscription
248 .get_requested_snapshot()
249 .unwrap_or_default()
250 .to_string();
251 //
252 // Prepare the subscription request.
253 //
254 let mut params: Vec<(&str, &str)> = vec![
255 ("LS_data_adapter", &ls_data_adapter),
256 ("LS_reqId", &ls_req_id),
257 ("LS_op", "add"),
258 ("LS_subId", &ls_sub_id),
259 ("LS_mode", &ls_mode),
260 ("LS_group", &ls_group),
261 ("LS_schema", &ls_schema),
262 ("LS_ack", "false"),
263 ];
264 // Remove the data adapter parameter if not specified.
265 if ls_data_adapter == "" {
266 params.remove(0);
267 }
268 if ls_snapshot != "" {
269 params.push(("LS_snapshot", &ls_snapshot));
270 }
271
272 Ok(serde_urlencoded::to_string(¶ms)?)
273 }
274
275 fn get_unsubscription_params(
276 subscription_id: usize,
277 request_id: usize,
278 ) -> Result<String, Box<dyn Error + Send + Sync>> {
279 let ls_req_id = request_id.to_string();
280 let ls_sub_id = subscription_id.to_string();
281 //
282 // Prepare the unsubscription request.
283 //
284 let params: Vec<(&str, &str)> = vec![
285 ("LS_reqId", &ls_req_id),
286 ("LS_op", "delete"),
287 ("LS_subId", &ls_sub_id),
288 ];
289
290 Ok(serde_urlencoded::to_string(¶ms)?)
291 }
292
293 /// Operation method that requests to open a Session against the configured Lightstreamer Server.
294 ///
295 /// When `connect()` is called, unless a single transport was forced through `ConnectionOptions.setForcedTransport()`,
296 /// the so called "Stream-Sense" mechanism is started: if the client does not receive any answer
297 /// for some seconds from the streaming connection, then it will automatically open a polling
298 /// connection.
299 ///
300 /// A polling connection may also be opened if the environment is not suitable for a streaming
301 /// connection.
302 ///
303 /// Note that as "polling connection" we mean a loop of polling requests, each of which requires
304 /// opening a synchronous (i.e. not streaming) connection to Lightstreamer Server.
305 ///
306 /// Note that the request to connect is accomplished by the client in a separate thread; this
307 /// means that an invocation to `getStatus()` right after `connect()` might not reflect the change
308 /// yet.
309 ///
310 /// When the request to connect is finally being executed, if the current status of the client
311 /// is not `DISCONNECTED`, then nothing will be done.
312 ///
313 /// # Raises
314 ///
315 /// * `IllegalStateException`: if no server address was configured.
316 ///
317 /// See also `getStatus()`
318 ///
319 /// See also `disconnect()`
320 ///
321 /// See also `ClientListener.onStatusChange()`
322 ///
323 /// See also `ConnectionDetails.setServerAddress()`
324 #[instrument]
325 pub async fn connect(&mut self, shutdown_signal: Arc<Notify>) -> Result<(), Box<dyn Error + Send + Sync>> {
326 // Check if the server address is configured.
327 if self.server_address.is_none() {
328 return Err(Box::new(IllegalStateException::new(
329 "No server address was configured.",
330 )));
331 }
332 //
333 // Only WebSocket streaming transport is currently supported.
334 //
335 let forced_transport = self.connection_options.get_forced_transport();
336 if forced_transport.is_none()
337 || *forced_transport.unwrap() /* unwrap() is safe here */ != Transport::WsStreaming
338 {
339 return Err(Box::new(IllegalStateException::new(
340 "Only WebSocket streaming transport is currently supported.",
341 )));
342 }
343 //
344 // Convert the HTTP URL to a WebSocket URL.
345 //
346 let http_url = self.connection_details.get_server_address().unwrap(); // unwrap() is safe here.
347 let mut url = Url::parse(&http_url)
348 .expect("Failed to parse server address URL from connection details.");
349 match url.scheme() {
350 "http" => url
351 .set_scheme("ws")
352 .expect("Failed to set scheme to ws for WebSocket URL."),
353 "https" => url
354 .set_scheme("wss")
355 .expect("Failed to set scheme to wss for WebSocket URL."),
356 invalid_scheme => {
357 return Err(Box::new(IllegalStateException::new(&format!(
358 "Unsupported scheme '{}' found when converting HTTP URL to WebSocket URL.",
359 invalid_scheme
360 ))));
361 }
362 }
363 let ws_url = url.as_str();
364
365 // Build the WebSocket request with the necessary headers.
366 let request = Request::builder()
367 .uri(ws_url)
368 .header(
369 HeaderName::from_static("connection"),
370 HeaderValue::from_static("Upgrade"),
371 )
372 .header(
373 HeaderName::from_static("host"),
374 HeaderValue::from_str(url.host_str().unwrap_or("localhost")).map_err(|err| {
375 IllegalStateException::new(&format!(
376 "Invalid header value for header with name 'host': {}",
377 err
378 ))
379 })?,
380 )
381 .header(
382 HeaderName::from_static("sec-websocket-key"),
383 HeaderValue::from_static(Self::SEC_WEBSOCKET_KEY),
384 )
385 .header(
386 HeaderName::from_static("sec-websocket-protocol"),
387 HeaderValue::from_static(Self::SEC_WEBSOCKET_PROTOCOL),
388 )
389 .header(
390 HeaderName::from_static("sec-websocket-version"),
391 HeaderValue::from_static(Self::SEC_WEBSOCKET_VERSION),
392 )
393 .header(
394 HeaderName::from_static("upgrade"),
395 HeaderValue::from_static(Self::SEC_WEBSOCKET_UPGRADE),
396 )
397 .body(())?;
398
399 // Connect to the Lightstreamer server using WebSocket.
400 let ws_stream = match connect_async(request).await {
401 Ok((ws_stream, response)) => {
402 if let Some(server_header) = response.headers().get("server") {
403 self.make_log(
404 Level::INFO,
405 &format!(
406 "Connected to Lightstreamer server: {}",
407 server_header.to_str().unwrap_or("")
408 ),
409 );
410 } else {
411 self.make_log(Level::INFO, "Connected to Lightstreamer server");
412 }
413 ws_stream
414 }
415 Err(err) => {
416 return Err(Box::new(std::io::Error::new(
417 std::io::ErrorKind::ConnectionRefused,
418 format!(
419 "Failed to connect to Lightstreamer server with WebSocket: {}",
420 err
421 ),
422 )));
423 }
424 };
425
426 // Split the WebSocket stream into a write and a read stream.
427 let (mut write_stream, mut read_stream) = ws_stream.split();
428
429 //
430 // Initiate communication with the server by sending a 'wsok' message.
431 //
432 write_stream.send(Message::Text("wsok".into())).await?;
433
434 //
435 // Start reading and processing messages from the server.
436 //
437 let mut is_connected = false;
438 let mut request_id: usize = 0;
439 let mut _session_id: Option<String> = None;
440 let mut subscription_id: usize = 0;
441 let mut subscription_item_updates: HashMap<usize, HashMap<usize, ItemUpdate>> =
442 HashMap::new();
443 loop {
444 tokio::select! {
445 message = read_stream.next() => {
446 match message {
447 Some(Ok(Message::Text(text))) => {
448 // Messages could include multiple submessages separated by /r/n.
449 // Split the message into submessages and process each one separately.
450 let submessages: Vec<&str> = text.split("\r\n")
451 .filter(|&line| !line.trim().is_empty()) // Filter out empty lines.
452 .collect();
453 for submessage in submessages {
454 let clean_text = clean_message(&submessage);
455 let submessage_fields: Vec<&str> = clean_text.split(",").collect();
456 match *submessage_fields.first().unwrap_or(&"") {
457 //
458 // Errors from server.
459 //
460 "conerr" | "reqerr" => {
461 self.make_log( Level::ERROR, &format!("Received connection error from Lightstreamer server: {}", clean_text) );
462 break;
463 },
464 //
465 // Session created successfully.
466 //
467 "conok" => {
468 is_connected = true;
469 if let Some(session_id) = submessage_fields.get(1).as_deref() {
470 self.make_log( Level::DEBUG, &format!("Session creation confirmed by server: {}", clean_text) );
471 self.make_log( Level::DEBUG, &format!("Session created with ID: {:?}", session_id) );
472 //
473 // Subscribe to the desired items.
474 //
475 while let Some(subscription) = self.subscriptions.get_mut(subscription_id) {
476 //
477 // Gather all the necessary subscription parameters.
478 //
479 subscription_id += 1;
480 request_id += 1;
481 subscription.id = subscription_id;
482 subscription.id_sender.send(subscription_id)?;
483
484 let encoded_params = match Self::get_subscription_params(subscription, request_id)
485 {
486 Ok(params) => params,
487 Err(err) => {
488 return Err(err);
489 },
490 };
491
492 write_stream
493 .send(Message::Text(format!("control\r\n{}", encoded_params).into()))
494 .await?;
495 info!("Sent subscription request: '{}'", encoded_params);
496 }
497 } else {
498 return Err(Box::new(std::io::Error::new(
499 std::io::ErrorKind::InvalidData,
500 "Session ID not found in 'conok' message from server",
501 )));
502 }
503 },
504 //
505 // Notifications from server.
506 //
507 "conf" | "cons" | "clientip" | "servname" | "prog" | "sync" | "eos" => {
508 self.make_log( Level::INFO, &format!("Received notification from server: {}", clean_text) );
509 // Don't do anything with these notifications for now.
510 },
511 "probe" => {
512 self.make_log( Level::DEBUG, &format!("Received probe message from server: {}", clean_text ) );
513 },
514 "reqok" => {
515 self.make_log( Level::DEBUG, &format!("Received reqok message from server: '{}'", clean_text ) );
516 },
517 //
518 // Subscription confirmation from server.
519 //
520 "subok" => {
521 self.make_log( Level::INFO, &format!("Subscription confirmed by server: '{}'", clean_text) );
522 },
523 //
524 // Usubscription confirmation from server.
525 //
526 "unsub" => {
527 self.make_log( Level::INFO, &format!("Unsubscription confirmed by server: '{}'", clean_text) );
528 },
529 //
530 // Data updates from server.
531 //
532 "u" => {
533 // Parse arguments from the received message.
534 let arguments = parse_arguments(&clean_text);
535 //
536 // Extract the subscription from the first argument.
537 //
538 let subscription_index = arguments.get(1).unwrap_or(&"").parse::<usize>().unwrap_or(0);
539 let subscription = match get_subscription_by_id(self.get_subscriptions(), subscription_index) {
540 Some(subscription) => subscription,
541 None => {
542 self.make_log( Level::WARN, &format!("Subscription not found for index: {}", subscription_index) );
543 continue;
544
545 }
546 };
547 //
548 // Extract the item from the second argument.
549 //
550 let item_index = arguments.get(2).unwrap_or(&"").parse::<usize>().unwrap_or(0);
551 let item = match subscription.get_items() {
552 Some(items) => items.get(item_index-1),
553 None => {
554 self.make_log( Level::WARN, &format!("No items found in subscription: {:?}", subscription) );
555 continue;
556 }
557 };
558 //
559 // Determine if the update is a snapshot or real-time update based on the subscription parameters.
560 //
561 let is_snapshot = match subscription.get_requested_snapshot() {
562 Some(ls_snapshot) => {
563 match ls_snapshot {
564 Snapshot::No => false,
565 Snapshot::Yes => {
566 match subscription.get_mode() {
567 SubscriptionMode::Merge => {
568 if arguments.len() == 4 && arguments[3] == "$" {
569 // EOS notification received
570 true
571 } else {
572 // If item doesn't exist in item_updates yet, the first update
573 // is always a snapshot.
574 if let Some(item_updates) = subscription_item_updates.get(&(subscription_index)) {
575 if let Some(_) = item_updates.get(&(item_index)) {
576 // Item update already exists in item_updates, so it's not a snapshot.
577 false
578 } else {
579 // Item update doesn't exist in item_updates, so the first update is always a snapshot.
580 true
581 }
582 } else {
583 // Item updates not found for subscription, so the first update is always a snapshot.
584 true
585 }
586 }
587 },
588 SubscriptionMode::Distinct | SubscriptionMode::Command => {
589 if !subscription.is_subscribed() {
590 true
591 } else {
592 false
593 }
594 },
595 _ => false,
596 }
597 },
598 _ => false,
599 }
600 },
601 None => false,
602 };
603
604 // Extract the field values from the third argument.
605 let field_values: Vec<&str> = arguments.get(3).unwrap_or(&"").split('|').collect();
606
607 //
608 // Get fields from subscription and create a HashMap of field names and values.
609 //
610 let subscription_fields = subscription.get_fields();
611 let mut field_map: HashMap<String, Option<String>> = subscription_fields
612 .map(|fields| fields.iter().map(|field_name| (field_name.to_string(), None)).collect())
613 .unwrap_or_default();
614
615 let mut field_index = 0;
616 for value in field_values {
617 match value {
618 "" => {
619 // An empty value means the field is unchanged compared to the previous update of the same field.
620 if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
621 field_map.insert(field_name.to_string(), None);
622 }
623 field_index += 1;
624 }
625 "#" | "$" => {
626 // A value corresponding to a hash sign "#" or dollar sign "$" means the field is null or empty.
627 if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
628 field_map.insert(field_name.to_string(), Some("".to_string()));
629 }
630 field_index += 1;
631 }
632 value if value.starts_with('^') => {
633 let command = value.chars().nth(1).unwrap_or(' ');
634 match command {
635 '0'..='9' => {
636 let count = value[1..].parse().unwrap_or(0);
637 for i in 0..count {
638 if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index + i)) {
639 field_map.insert(field_name.to_string(), None);
640 }
641 }
642 field_index += count;
643 }
644 'P' | 'T' => {
645 let diff_value = serde_urlencoded::from_str(&value[2..]).unwrap_or_else(|_| value[2..].to_string());
646 if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
647 if let Some(prev_value) = field_map.get(field_name).and_then(|v| v.as_ref()) {
648 let new_value = match command {
649 'P' => {
650 // Apply JSON Patch
651 let patch: serde_json::Value = serde_json::from_str(&diff_value).unwrap_or(serde_json::Value::Null);
652 let mut prev_json: serde_json::Value = serde_json::from_str(prev_value).unwrap_or(serde_json::Value::Null);
653 let patch_operations: Vec<json_patch::PatchOperation> = serde_json::from_value(patch).unwrap_or_default();
654 let _ = json_patch::patch(&mut prev_json, &patch_operations);
655 prev_json.to_string()
656 }
657 'T' => {
658 // Apply TLCP-diff
659 //tlcp_diff::apply_diff(prev_value, &diff_value).unwrap_or_else(|_| prev_value.to_string())
660 unimplemented!("Implement TLCP-diff");
661 }
662 _ => unreachable!(),
663 };
664 field_map.insert(field_name.to_string(), Some(new_value.to_string()));
665 }
666 }
667 field_index += 1;
668 }
669 _ => {
670 let decoded_value = serde_urlencoded::from_str(value).unwrap_or_else(|_| value.to_string());
671 if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
672 field_map.insert(field_name.to_string(), Some(decoded_value));
673 }
674 field_index += 1;
675 }
676 }
677 }
678 value if value.starts_with('{') => {
679 // in this case it is a json payload that we will let the consumer handle. In this case, it is important
680 // to preserve casing for parsing.
681 let original_json = parse_arguments(&submessage).get(3).unwrap_or(&"").split('|').collect::<Vec<&str>>();
682 let mut payload = "";
683 for json in original_json.iter()
684 {
685 if json.is_empty() || json.to_string() == "#"
686 {
687 continue;
688 }
689
690 payload = json;
691 }
692
693 if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
694 field_map.insert(field_name.to_string(), Some(payload.to_string()));
695 }
696 field_index += 1;
697 }
698 _ => {
699 let decoded_value = serde_urlencoded::from_str(value).unwrap_or_else(|_| value.to_string());
700 if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) {
701 field_map.insert(field_name.to_string(), Some(decoded_value));
702 }
703 field_index += 1;
704 }
705 }
706 }
707
708 // Store only item_update's changed fields.
709 let changed_fields: HashMap<String, String> = field_map.iter()
710 .filter_map(|(k, v)| {
711 if let Some(v) = v {
712 Some((k.clone(), v.clone()))
713 } else {
714 None
715 }
716 })
717 .collect();
718
719 //
720 // Take the proper item_update from item_updates and update it with changed fields.
721 // If the item_update doesn't exist yet, create a new one.
722 //
723 let current_item_update: ItemUpdate;
724 match subscription_item_updates.get_mut(&(subscription_index)) {
725 Some(item_updates) => match item_updates.get_mut(&(item_index)) {
726 Some(item_update) => {
727 //
728 // Iterate changed_fields and update existing item_update.fields assigning the new values.
729 //
730 for (field_name, new_value) in &changed_fields {
731 if item_update.fields.contains_key(field_name) {
732 item_update.fields.insert((*field_name).clone(), Some(new_value.clone()));
733 }
734 }
735 item_update.changed_fields = changed_fields;
736 item_update.is_snapshot = is_snapshot;
737 current_item_update = item_update.clone();
738 },
739 None => {
740 // Create a new item_update and add it to item_updates.
741 let item_update = ItemUpdate {
742 item_name: item.cloned(),
743 item_pos: item_index,
744 fields: field_map,
745 changed_fields: changed_fields,
746 is_snapshot: is_snapshot,
747 };
748 current_item_update = item_update.clone();
749 item_updates.insert(item_index, item_update);
750 }
751 },
752 None => {
753 // Create a new item_update and add it to item_updates.
754 let item_update = ItemUpdate {
755 item_name: item.cloned(),
756 item_pos: item_index,
757 fields: field_map,
758 changed_fields: changed_fields,
759 is_snapshot: is_snapshot,
760 };
761 current_item_update = item_update.clone();
762 let mut item_updates = HashMap::new();
763 item_updates.insert(item_index, item_update);
764 subscription_item_updates.insert(subscription_index, item_updates);
765 }
766 };
767
768 // Get mutable subscription listeners directly.
769 let subscription_listeners = subscription.get_listeners();
770
771 // Iterate subscription listeners and call on_item_update for each listener.
772 for listener in subscription_listeners {
773 listener.on_item_update(¤t_item_update);
774 }
775 }
776 //
777 // Connection confirmation from server.
778 //
779 "wsok" => {
780 self.make_log( Level::INFO, &format!("Connection confirmed by server: '{}'", clean_text) );
781 //
782 // Request session creation.
783 //
784 let ls_adapter_set = match self.connection_details.get_adapter_set() {
785 Some(adapter_set) => adapter_set,
786 None => {
787 return Err(Box::new(IllegalStateException::new(
788 "No adapter set found in connection details.",
789 )));
790 },
791 };
792 let ls_send_sync = self.connection_options.get_send_sync().to_string();
793 let mut params: Vec<(&str, &str)> = vec![
794 ("LS_adapter_set", &ls_adapter_set),
795 ("LS_cid", "mgQkwtwdysogQz2BJ4Ji kOj2Bg"),
796 ("LS_send_sync", &ls_send_sync),
797 ];
798 if let Some(user) = &self.connection_details.get_user() {
799 params.push(("LS_user", user));
800 }
801 if let Some(password) = &self.connection_details.get_password() {
802 params.push(("LS_password", password));
803 }
804 params.push(("LS_protocol", Self::TLCP_VERSION));
805 let encoded_params = serde_urlencoded::to_string(¶ms)?;
806 write_stream
807 .send(Message::Text(format!("create_session\r\n{}\n", encoded_params).into()))
808 .await?;
809 self.make_log( Level::DEBUG, &format!("Sent create session request: '{}'", encoded_params) );
810 },
811 unexpected_message => {
812 return Err(Box::new(std::io::Error::new(
813 std::io::ErrorKind::InvalidData,
814 format!(
815 "Unexpected message received from server: '{:?}'",
816 unexpected_message
817 ),
818 )));
819 },
820 }
821 }
822 },
823 Some(Ok(non_text_message)) => {
824 return Err(Box::new(std::io::Error::new(
825 std::io::ErrorKind::InvalidData,
826 format!(
827 "Unexpected non-text message from server: {:?}",
828 non_text_message
829 ),
830 )));
831 },
832 Some(Err(err)) => {
833 return Err(Box::new(std::io::Error::new(
834 std::io::ErrorKind::InvalidData,
835 format!("Error reading message from server: {}", err),
836 )));
837 },
838 None => {
839 self.make_log( Level::DEBUG, "No more messages from server" );
840 break;
841 },
842 }
843 },
844 Some(subscription_request) = self.subscription_receiver.recv() => {
845 println!("Received subscription/unsubscription request.");
846 request_id += 1;
847 // Process subscription requests.
848 if subscription_request.subscription.is_some()
849 {
850 self.subscriptions.push(subscription_request.subscription.unwrap());
851
852 // if we are not connected yet, we will subscribe later
853 if !is_connected {
854 continue;
855 }
856
857 subscription_id += 1;
858 self.subscriptions.last_mut().unwrap().id = subscription_id;
859 self.subscriptions.last().unwrap().id_sender.send(subscription_id)?;
860
861 let encoded_params = match Self::get_subscription_params(self.subscriptions.last().unwrap(), request_id)
862 {
863 Ok(params) => params,
864 Err(err) => {
865 return Err(err);
866 },
867 };
868
869 write_stream
870 .send(Message::Text(format!("control\r\n{}", encoded_params).into()))
871 .await?;
872
873 self.make_log( Level::INFO, &format!("Sent subscription request: '{}'", encoded_params) );
874 }
875 // Process unsubscription requests.
876 else if subscription_request.subscription_id.is_some()
877 {
878 let unsubscription_id = subscription_request.subscription_id.unwrap();
879 let encoded_params = match Self::get_unsubscription_params(unsubscription_id, request_id)
880 {
881 Ok(params) => params,
882 Err(err) => {
883 return Err(err);
884 },
885 };
886
887 write_stream
888 .send(Message::Text(format!("control\r\n{}", encoded_params).into()))
889 .await?;
890
891 self.make_log( Level::INFO, &format!("Sent unsubscription request: '{}'", encoded_params) );
892
893 self.subscriptions.retain(|s| s.id != unsubscription_id);
894
895 if self.subscriptions.is_empty()
896 {
897 self.make_log( Level::INFO, &"No more subscriptions, disconnecting".to_string() );
898 shutdown_signal.notify_one();
899 }
900 }
901 },
902 _ = shutdown_signal.notified() => {
903 self.make_log( Level::INFO, &format!("Received shutdown signal") );
904 break;
905 },
906 }
907 }
908
909 Ok(())
910 }
911
912 /// Operation method that requests to close the Session opened against the configured Lightstreamer
913 /// Server (if any).
914 ///
915 /// When `disconnect()` is called, the "Stream-Sense" mechanism is stopped.
916 ///
917 /// Note that active `Subscription` instances, associated with this `LightstreamerClient` instance,
918 /// are preserved to be re-subscribed to on future Sessions.
919 ///
920 /// Note that the request to disconnect is accomplished by the client in a separate thread; this
921 /// means that an invocation to `getStatus()` right after `disconnect()` might not reflect the
922 /// change yet.
923 ///
924 /// When the request to disconnect is finally being executed, if the status of the client is
925 /// "DISCONNECTED", then nothing will be done.
926 ///
927 /// See also `connect()`
928 #[instrument]
929 pub async fn disconnect(&mut self) {
930 // Implementation for disconnect
931 self.make_log( Level::INFO, "Disconnecting from Lightstreamer server" );
932 }
933
934 /// Static inquiry method that can be used to share cookies between connections to the Server
935 /// (performed by this library) and connections to other sites that are performed by the application.
936 /// With this method, cookies received from the Server can be extracted for sending through other
937 /// connections, according with the URI to be accessed.
938 ///
939 /// See `addCookies()` for clarifications on when cookies are directly stored by the library and
940 /// when not.
941 ///
942 /// # Parameters
943 ///
944 /// * `uri`: the URI to which the cookies should be sent, or `None`.
945 ///
946 /// # Returns
947 ///
948 /// A list with the various cookies that can be sent in a HTTP request for the specified URI.
949 /// If a `None` URI was supplied, all available non-expired cookies will be returned.
950 pub fn get_cookies(_uri: Option<&str>) -> Cookie {
951 // Implementation for get_cookies
952 unimplemented!()
953 }
954
955 /// Returns a list containing the `ClientListener` instances that were added to this client.
956 ///
957 /// # Returns
958 ///
959 /// A list containing the listeners that were added to this client.
960 ///
961 /// See also `addListener()`
962 pub fn get_listeners(&self) -> &Vec<Box<dyn ClientListener>> {
963 &self.listeners
964 }
965
966 /// Inquiry method that gets the current client status and transport (when applicable).
967 ///
968 /// # Returns
969 ///
970 /// The current client status. It can be one of the following values:
971 ///
972 /// - `"CONNECTING"`: the client is waiting for a Server's response in order to establish a connection;
973 /// - `"CONNECTED:STREAM-SENSING"`: the client has received a preliminary response from the server
974 /// and is currently verifying if a streaming connection is possible;
975 /// - `"CONNECTED:WS-STREAMING"`: a streaming connection over WebSocket is active;
976 /// - `"CONNECTED:HTTP-STREAMING"`: a streaming connection over HTTP is active;
977 /// - `"CONNECTED:WS-POLLING"`: a polling connection over WebSocket is in progress;
978 /// - `"CONNECTED:HTTP-POLLING"`: a polling connection over HTTP is in progress;
979 /// - `"STALLED"`: the Server has not been sending data on an active streaming connection for
980 /// longer than a configured time;
981 /// - `"DISCONNECTED:WILL-RETRY"`: no connection is currently active but one will be opened
982 /// (possibly after a timeout);
983 /// - `"DISCONNECTED:TRYING-RECOVERY"`: no connection is currently active, but one will be opened
984 /// as soon as possible, as an attempt to recover the current session after a connection issue;
985 /// - `"DISCONNECTED"`: no connection is currently active.
986 ///
987 /// See also `ClientListener.onStatusChange()`
988 pub fn get_status(&self) -> &ClientStatus {
989 &self.status
990 }
991
992 /// Inquiry method that returns a list containing all the `Subscription` instances that are
993 /// currently "active" on this `LightstreamerClient`.
994 ///
995 /// Internal second-level `Subscription` are not included.
996 ///
997 /// # Returns
998 ///
999 /// A list, containing all the `Subscription` currently "active" on this `LightstreamerClient`.
1000 /// The list can be empty.
1001 ///
1002 /// See also `subscribe()`
1003 pub fn get_subscriptions(&self) -> &Vec<Subscription> {
1004 &self.subscriptions
1005 }
1006
1007 /// Creates a new instance of `LightstreamerClient`.
1008 ///
1009 /// The constructor initializes the client with the server address and adapter set, if provided.
1010 /// It sets up the connection details and options for the client. If no server address or
1011 /// adapter set is specified, those properties on the client will be `None`. This allows
1012 /// for late configuration of these details before connecting to the Lightstreamer server.
1013 ///
1014 /// # Arguments
1015 /// * `server_address` - An optional reference to a string slice that represents the server
1016 /// address to connect to. If `None`, the server address must be set later.
1017 /// * `adapter_set` - An optional reference to a string slice that specifies the adapter set name.
1018 /// If `None`, the adapter set must be set later.
1019 ///
1020 /// # Returns
1021 /// A result containing the new `LightstreamerClient` instance if successful, or an
1022 /// `IllegalStateException` if the initialization fails due to invalid state conditions.
1023 ///
1024 /// # Panics
1025 /// Does not panic under normal circumstances. However, unexpected internal errors during
1026 /// the creation of internal components could cause panics, which should be considered when
1027 /// using this function in production code.
1028 ///
1029 /// # Example
1030 /// ```
1031 /// // Example usage of `new` to create a LightstreamerClient with specified server address and
1032 /// // adapter set.
1033 /// use lightstreamer_client::ls_client::LightstreamerClient;
1034 /// let server_address = Some("http://myserver.com");
1035 /// let adapter_set = Some("MY_ADAPTER_SET");
1036 /// let ls_client = LightstreamerClient::new(server_address, adapter_set, None, None);
1037 /// assert!(ls_client.is_ok());
1038 /// ```
1039 pub fn new(
1040 server_address: Option<&str>,
1041 adapter_set: Option<&str>,
1042 username: Option<&str>,
1043 password: Option<&str>,
1044 ) -> Result<LightstreamerClient, Box<dyn Error>> {
1045 let connection_details =
1046 ConnectionDetails::new(server_address, adapter_set, username, password)?;
1047 let connection_options = ConnectionOptions::default();
1048 let (subscription_sender, subscription_receiver) = mpsc::channel(100);
1049
1050 Ok(LightstreamerClient {
1051 server_address: server_address.map(|s| s.to_string()),
1052 adapter_set: adapter_set.map(|s| s.to_string()),
1053 connection_details,
1054 connection_options,
1055 listeners: Vec::new(),
1056 subscriptions: Vec::new(),
1057 status: ClientStatus::Disconnected(DisconnectionType::WillRetry),
1058 logging: LogType::StdLogs,
1059 subscription_sender,
1060 subscription_receiver,
1061 })
1062 }
1063
1064 /// Removes a listener from the `LightstreamerClient` instance so that it will not receive
1065 /// events anymore.
1066 ///
1067 /// A listener can be removed at any time.
1068 ///
1069 /// # Parameters
1070 ///
1071 /// * `listener`: The listener to be removed.
1072 ///
1073 /// See also `addListener()`
1074 pub fn remove_listener(&mut self, _listener: Box<dyn ClientListener>) {
1075 unimplemented!("Implement mechanism to remove listener from LightstreamerClient");
1076 //self.listeners.remove(&listener);
1077 }
1078
1079 /// Operation method that sends a message to the Server. The message is interpreted and handled
1080 /// by the Metadata Adapter associated to the current Session. This operation supports in-order
1081 /// guaranteed message delivery with automatic batching. In other words, messages are guaranteed
1082 /// to arrive exactly once and respecting the original order, whatever is the underlying transport
1083 /// (HTTP or WebSockets). Furthermore, high frequency messages are automatically batched, if
1084 /// necessary, to reduce network round trips.
1085 ///
1086 /// Upon subsequent calls to the method, the sequential management of the involved messages is
1087 /// guaranteed. The ordering is determined by the order in which the calls to `sendMessage` are
1088 /// issued.
1089 ///
1090 /// If a message, for any reason, doesn't reach the Server (this is possible with the HTTP transport),
1091 /// it will be resent; however, this may cause the subsequent messages to be delayed. For this
1092 /// reason, each message can specify a "delayTimeout", which is the longest time the message,
1093 /// after reaching the Server, can be kept waiting if one of more preceding messages haven't
1094 /// been received yet. If the "delayTimeout" expires, these preceding messages will be discarded;
1095 /// any discarded message will be notified to the listener through `ClientMessageListener.onDiscarded()`.
1096 /// Note that, because of the parallel transport of the messages, if a zero or very low timeout
1097 /// is set for a message and the previous message was sent immediately before, it is possible
1098 /// that the latter gets discarded even if no communication issues occur. The Server may also
1099 /// enforce its own timeout on missing messages, to prevent keeping the subsequent messages for
1100 /// long time.
1101 ///
1102 /// Sequence identifiers can also be associated with the messages. In this case, the sequential
1103 /// management is restricted to all subsets of messages with the same sequence identifier associated.
1104 ///
1105 /// Notifications of the operation outcome can be received by supplying a suitable listener. The
1106 /// supplied listener is guaranteed to be eventually invoked; listeners associated with a sequence
1107 /// are guaranteed to be invoked sequentially.
1108 ///
1109 /// The "UNORDERED_MESSAGES" sequence name has a special meaning. For such a sequence, immediate
1110 /// processing is guaranteed, while strict ordering and even sequentialization of the processing
1111 /// is not enforced. Likewise, strict ordering of the notifications is not enforced. However,
1112 /// messages that, for any reason, should fail to reach the Server whereas subsequent messages
1113 /// had succeeded, might still be discarded after a server-side timeout, in order to ensure that
1114 /// the listener eventually gets a notification.
1115 ///
1116 /// Moreover, if "UNORDERED_MESSAGES" is used and no listener is supplied, a "fire and forget"
1117 /// scenario is assumed. In this case, no checks on missing, duplicated or overtaken messages
1118 /// are performed at all, so as to optimize the processing and allow the highest possible throughput.
1119 ///
1120 /// Since a message is handled by the Metadata Adapter associated to the current connection, a
1121 /// message can be sent only if a connection is currently active. If the special `enqueueWhileDisconnected`
1122 /// flag is specified it is possible to call the method at any time and the client will take
1123 /// care of sending the message as soon as a connection is available, otherwise, if the current
1124 /// status is "DISCONNECTED*", the message will be abandoned and the `ClientMessageListener.onAbort()`
1125 /// event will be fired.
1126 ///
1127 /// Note that, in any case, as soon as the status switches again to "DISCONNECTED*", any message
1128 /// still pending is aborted, including messages that were queued with the `enqueueWhileDisconnected`
1129 /// flag set to `true`.
1130 ///
1131 /// Also note that forwarding of the message to the server is made in a separate thread, hence,
1132 /// if a message is sent while the connection is active, it could be aborted because of a subsequent
1133 /// disconnection. In the same way a message sent while the connection is not active might be
1134 /// sent because of a subsequent connection.
1135 ///
1136 /// # Parameters
1137 ///
1138 /// * `message`: a text message, whose interpretation is entirely demanded to the Metadata Adapter
1139 /// associated to the current connection.
1140 /// * `sequence`: an alphanumeric identifier, used to identify a subset of messages to be managed
1141 /// in sequence; underscore characters are also allowed. If the "UNORDERED_MESSAGES" identifier
1142 /// is supplied, the message will be processed in the special way described above. The parameter
1143 /// is optional; if set to `None`, "UNORDERED_MESSAGES" is used as the sequence name.
1144 /// * `delay_timeout`: a timeout, expressed in milliseconds. If higher than the Server configured
1145 /// timeout on missing messages, the latter will be used instead. The parameter is optional; if
1146 /// a negative value is supplied, the Server configured timeout on missing messages will be applied.
1147 /// This timeout is ignored for the special "UNORDERED_MESSAGES" sequence, although a server-side
1148 /// timeout on missing messages still applies.
1149 /// * `listener`: an object suitable for receiving notifications about the processing outcome. The
1150 /// parameter is optional; if not supplied, no notification will be available.
1151 /// * `enqueue_while_disconnected`: if this flag is set to `true`, and the client is in a disconnected
1152 /// status when the provided message is handled, then the message is not aborted right away but
1153 /// is queued waiting for a new session. Note that the message can still be aborted later when
1154 /// a new session is established.
1155 pub fn send_message(
1156 &mut self,
1157 message: &str,
1158 sequence: Option<&str>,
1159 _delay_timeout: Option<u64>,
1160 listener: Option<Box<dyn ClientMessageListener>>,
1161 enqueue_while_disconnected: bool,
1162 ) {
1163 let _sequence = sequence.unwrap_or_else(|| "UNORDERED_MESSAGES");
1164
1165 // Handle the message based on the current connection status
1166 match &self.status {
1167 ClientStatus::Connected(_connection_type) => {
1168 // Send the message to the server in a separate thread
1169 // ...
1170 }
1171 ClientStatus::Disconnected(_disconnection_type) => {
1172 if enqueue_while_disconnected {
1173 // Enqueue the message to be sent when a connection is available
1174 // ...
1175 } else {
1176 // Abort the message and notify the listener
1177 if let Some(listener) = listener {
1178 listener.on_abort(message, false);
1179 }
1180 }
1181 }
1182 _ => {
1183 // Enqueue the message to be sent when a connection is available
1184 // ...
1185 }
1186 }
1187 unimplemented!("Complete mechanism to send message to LightstreamerClient.");
1188 }
1189
1190 /// Static method that permits to configure the logging system used by the library. The logging
1191 /// system must respect the `LoggerProvider` interface. A custom class can be used to wrap any
1192 /// third-party logging system.
1193 ///
1194 /// If no logging system is specified, all the generated log is discarded.
1195 ///
1196 /// The following categories are available to be consumed:
1197 ///
1198 /// - `lightstreamer.stream`: logs socket activity on Lightstreamer Server connections; at INFO
1199 /// level, socket operations are logged; at DEBUG level, read/write data exchange is logged.
1200 /// - `lightstreamer.protocol`: logs requests to Lightstreamer Server and Server answers; at INFO
1201 /// level, requests are logged; at DEBUG level, request details and events from the Server are logged.
1202 /// - `lightstreamer.session`: logs Server Session lifecycle events; at INFO level, lifecycle events
1203 /// are logged; at DEBUG level, lifecycle event details are logged.
1204 /// - `lightstreamer.subscriptions`: logs subscription requests received by the clients and the related
1205 /// updates; at WARN level, alert events from the Server are logged; at INFO level, subscriptions
1206 /// and unsubscriptions are logged; at DEBUG level, requests batching and update details are logged.
1207 /// - `lightstreamer.actions`: logs settings / API calls.
1208 ///
1209 /// # Parameters
1210 ///
1211 /// * `provider`: A `LoggerProvider` instance that will be used to generate log messages by the
1212 /// library classes.
1213 pub fn set_logger_provider() {
1214 unimplemented!("Implement mechanism to set logger provider for LightstreamerClient.");
1215 }
1216 /*
1217 pub fn set_logger_provider(provider: LoggerProvider) {
1218 // Implementation for set_logger_provider
1219 }
1220 */
1221
1222 /// Provides a mean to control the way TLS certificates are evaluated, with the possibility to
1223 /// accept untrusted ones.
1224 ///
1225 /// May be called only once before creating any `LightstreamerClient` instance.
1226 ///
1227 /// # Parameters
1228 ///
1229 /// * `factory`: an instance of `ssl.SSLContext`
1230 ///
1231 /// # Raises
1232 ///
1233 /// * `IllegalArgumentException`: if the factory is `None`
1234 /// * `IllegalStateException`: if a factory is already installed
1235 pub fn set_trust_manager_factory() {
1236 unimplemented!("Implement mechanism to set trust manager factory for LightstreamerClient.");
1237 }
1238 /*
1239 pub fn set_trust_manager_factory(factory: Option<SslContext>) -> Result<(), IllegalArgumentException> {
1240 if factory.is_none() {
1241 return Err(IllegalArgumentException::new(
1242 "Factory cannot be None",
1243 ));
1244 }
1245
1246 // Implementation for set_trust_manager_factory
1247 Ok(())
1248 }
1249 */
1250
1251 /// Adds a subscription to the `LightstreamerClient` instance.
1252 ///
1253 /// Active subscriptions are subscribed to through the server as soon as possible (i.e. as soon
1254 /// as there is a session available). Active `Subscription` are automatically persisted across different
1255 /// sessions as long as a related unsubscribe call is not issued.
1256 ///
1257 /// Subscriptions can be given to the `LightstreamerClient` at any time. Once done the `Subscription`
1258 /// immediately enters the "active" state.
1259 ///
1260 /// Once "active", a `Subscription` instance cannot be provided again to a `LightstreamerClient`
1261 /// unless it is first removed from the "active" state through a call to `unsubscribe()`.
1262 ///
1263 /// Also note that forwarding of the subscription to the server is made in a separate thread.
1264 ///
1265 /// A successful subscription to the server will be notified through a `SubscriptionListener.onSubscription()`
1266 /// event.
1267 ///
1268 /// # Parameters
1269 ///
1270 /// * `subsrciption_sender`: A `Sender` object that sends a `SubscriptionRequest` to the `LightstreamerClient`
1271 /// * `subscription`: A `Subscription` object, carrying all the information needed to process real-time
1272 /// values.
1273 ///
1274 /// See also `unsubscribe()`
1275 pub fn subscribe(subscription_sender: Sender<SubscriptionRequest>, subscription: Subscription) {
1276 subscription_sender
1277 .try_send(SubscriptionRequest {
1278 subscription: Some(subscription),
1279 subscription_id: None,
1280 })
1281 .unwrap()
1282 }
1283
1284 /// If you want to be able to unsubscribe from a subscription, you need to keep track of the id
1285 /// of the subscription. This blocking method allows you to wait for the id of the subscription
1286 /// to be returned.
1287 ///
1288 /// # Parameters
1289 ///
1290 /// * `subscription_sender`: A `Sender` object that sends a `SubscriptionRequest` to the `LightstreamerClient`
1291 /// * `subscription`: A `Subscription` object, carrying all the information needed to process real-time
1292 ///
1293 pub async fn subscribe_get_id(
1294 subscription_sender: Sender<SubscriptionRequest>,
1295 subscription: Subscription,
1296 ) -> Result<usize, Box<dyn Error + Send + Sync>> {
1297 let mut id_receiver = subscription.id_receiver.clone();
1298 LightstreamerClient::subscribe(subscription_sender.clone(), subscription);
1299
1300 match id_receiver.changed().await {
1301 Ok(_) => Ok(*id_receiver.borrow()),
1302 Err(_) => Err(Box::new(IllegalStateException::new(
1303 "Failed to get subscription id",
1304 ))),
1305 }
1306 }
1307
1308 /// Operation method that removes a `Subscription` that is currently in the "active" state.
1309 ///
1310 /// By bringing back a `Subscription` to the "inactive" state, the unsubscription from all its
1311 /// items is requested to Lightstreamer Server.
1312 ///
1313 /// Subscription can be unsubscribed from at any time. Once done the `Subscription` immediately
1314 /// exits the "active" state.
1315 ///
1316 /// Note that forwarding of the unsubscription to the server is made in a separate thread.
1317 ///
1318 /// The unsubscription will be notified through a `SubscriptionListener.onUnsubscription()` event.
1319 ///
1320 /// # Parameters
1321 ///
1322 /// * `subsrciption_sender`: A `Sender` object that sends a `SubscriptionRequest` to the `LightstreamerClient`
1323 /// * `subscription_id`: The id of the subscription to be unsubscribed from.
1324 /// instance.
1325 pub fn unsubscribe(
1326 subscription_sender: Sender<SubscriptionRequest>,
1327 subscription_id: usize,
1328 ) {
1329 println!("Unsubscribing subscription with id: {}", subscription_id);
1330 subscription_sender
1331 .try_send(SubscriptionRequest {
1332 subscription: None,
1333 subscription_id: Some(subscription_id),
1334 })
1335 .unwrap()
1336 }
1337
1338 /// Method setting enum for the logging of this instance.
1339 ///
1340 /// Default logging type is StdLogs, corresponding to `stdout`
1341 ///
1342 /// `LightstreamerClient` has methods for logging that are compatible with the `Tracing` crate.
1343 /// Enabling logging for the `Tracing` crate requires implementation of a tracing subscriber
1344 /// and its configuration and formatting.
1345 ///
1346 /// # Parameters
1347 ///
1348 /// * `logging`: An enum declaring the logging type of this `LightstreamerClient` instance.
1349 pub fn set_logging_type(&mut self, logging: LogType) {
1350 self.logging = logging;
1351 }
1352
1353 /// Method for logging messages
1354 ///
1355 /// Match case wraps log types. `loglevel` param ignored in StdLogs case, all output to stdout.
1356 ///
1357 /// # Parameters
1358 ///
1359 /// * `loglevel` Enum determining use of stdout or Tracing subscriber.
1360 pub fn make_log(&mut self, loglevel: Level, log: &str) {
1361 match self.logging {
1362 LogType::StdLogs => {
1363 println!("{}", log);
1364 }
1365 LogType::TracingLogs => match loglevel {
1366 Level::INFO => {
1367 info!(log);
1368 }
1369 Level::WARN => {
1370 warn!(log);
1371 }
1372 Level::ERROR => {
1373 error!(log);
1374 }
1375 Level::TRACE => {
1376 trace!(log);
1377 }
1378 Level::DEBUG => {
1379 debug!(log);
1380 }
1381 },
1382 }
1383 }
1384}
1385
1386/// The transport type to be used by the client.
1387/// - WS: the Stream-Sense algorithm is enabled as in the `None` case but the client will
1388/// only use WebSocket based connections. If a connection over WebSocket is not possible
1389/// because of the environment the client will not connect at all.
1390/// - HTTP: the Stream-Sense algorithm is enabled as in the `None` case but the client
1391/// will only use HTTP based connections. If a connection over HTTP is not possible because
1392/// of the environment the client will not connect at all.
1393/// - WS-STREAMING: the Stream-Sense algorithm is disabled and the client will only connect
1394/// on Streaming over WebSocket. If Streaming over WebSocket is not possible because of
1395/// the environment the client will not connect at all.
1396/// - HTTP-STREAMING: the Stream-Sense algorithm is disabled and the client will only
1397/// connect on Streaming over HTTP. If Streaming over HTTP is not possible because of the
1398/// browser/environment the client will not connect at all.
1399/// - WS-POLLING: the Stream-Sense algorithm is disabled and the client will only connect
1400/// on Polling over WebSocket. If Polling over WebSocket is not possible because of the
1401/// environment the client will not connect at all.
1402/// - HTTP-POLLING: the Stream-Sense algorithm is disabled and the client will only connect
1403/// on Polling over HTTP. If Polling over HTTP is not possible because of the environment
1404/// the client will not connect at all.
1405#[derive(Debug, PartialEq)]
1406pub enum Transport {
1407 Ws,
1408 Http,
1409 WsStreaming,
1410 HttpStreaming,
1411 WsPolling,
1412 HttpPolling,
1413}