lightstreamer_rs/subscription/model.rs
1// Copyright (C) 2024 Joaquín Béjar García
2// Portions of this file are derived from lightstreamer-client
3// Copyright (C) 2024 Daniel López Azaña
4// Original project: https://github.com/daniloaz/lightstreamer-client
5//
6// This file is part of lightstreamer-rs.
7//
8// lightstreamer-rs is free software: you can redistribute it and/or modify
9// it under the terms of the GNU General Public License as published by
10// the Free Software Foundation, either version 3 of the License, or
11// (at your option) any later version.
12//
13// lightstreamer-rs is distributed in the hope that it will be useful,
14// but WITHOUT ANY WARRANTY; without even the implied warranty of
15// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16// GNU General Public License for more details.
17//
18// You should have received a copy of the GNU General Public License
19// along with lightstreamer-rs. If not, see <https://www.gnu.org/licenses/>.
20
21use crate::subscription::SubscriptionListener;
22use crate::utils::LightstreamerError;
23use std::collections::HashMap;
24use std::fmt::{self, Debug, Formatter};
25use tokio::sync::mpsc::{Receiver, Sender, channel};
26
27/// Enum representing the snapshot delivery preferences to be requested to Lightstreamer Server for the items in the Subscription.
28#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash)]
29#[repr(u8)]
30pub enum Snapshot {
31 /// Request the full snapshot for the subscribed items.
32 Yes,
33 /// Do not request any snapshot for the subscribed items.
34 No,
35 /// Request a snapshot with a specific length (number of updates).
36 Number(usize),
37 /// Default value. No snapshot preference will be sent to the server.
38 #[default]
39 None,
40}
41
42impl Default for &Snapshot {
43 fn default() -> Self {
44 &Snapshot::None
45 }
46}
47
48impl fmt::Display for Snapshot {
49 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
50 match self {
51 Snapshot::Yes => write!(f, "true"),
52 Snapshot::No => write!(f, "false"),
53 Snapshot::Number(n) => write!(f, "{}", n),
54 Snapshot::None => write!(f, ""),
55 }
56 }
57}
58
59/// Enum representing the subscription mode.
60#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
61#[repr(u8)]
62pub enum SubscriptionMode {
63 /// MERGE mode. The server sends an update for a specific item only if the state of at least one of the fields has changed.
64 Merge,
65 /// DISTINCT mode. The server sends an update for an item every time new data for that item is available.
66 Distinct,
67 /// RAW mode. The server forwards any update for an item that it receives, without performing any processing.
68 Raw,
69 /// COMMAND mode. The server sends updates based on add, update, and delete commands.
70 Command,
71}
72
73impl fmt::Display for SubscriptionMode {
74 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
75 match self {
76 SubscriptionMode::Merge => write!(f, "MERGE"),
77 SubscriptionMode::Distinct => write!(f, "DISTINCT"),
78 SubscriptionMode::Command => write!(f, "COMMAND"),
79 SubscriptionMode::Raw => write!(f, "RAW"),
80 }
81 }
82}
83
84/// Struct representing a Subscription to be submitted to a Lightstreamer Server.
85/// It contains subscription details and the listeners needed to process the real-time data.
86pub struct Subscription {
87 /// The subscription mode for the items, required by Lightstreamer Server.
88 mode: SubscriptionMode,
89 /// An array of items to be subscribed to through Lightstreamer server.
90 items: Option<Vec<String>>,
91 /// An "Item Group" identifier representing a list of items.
92 item_group: Option<String>,
93 /// An array of fields for the items to be subscribed to through Lightstreamer Server.
94 fields: Option<Vec<String>>,
95 /// A "Field Schema" identifier representing a list of fields.
96 field_schema: Option<String>,
97 /// The name of the Data Adapter that supplies all the items for this Subscription.
98 data_adapter: Option<String>,
99 /// The name of the second-level Data Adapter for a COMMAND Subscription.
100 command_second_level_data_adapter: Option<String>,
101 /// The "Field List" to be subscribed to through Lightstreamer Server for the second-level items in a COMMAND Subscription.
102 command_second_level_fields: Option<Vec<String>>,
103 /// The "Field Schema" to be subscribed to through Lightstreamer Server for the second-level items in a COMMAND Subscription.
104 command_second_level_field_schema: Option<String>,
105 /// The length to be requested to Lightstreamer Server for the internal queuing buffers for the items in the Subscription.
106 requested_buffer_size: Option<usize>,
107 /// The maximum update frequency to be requested to Lightstreamer Server for all the items in the Subscription.
108 requested_max_frequency: Option<f64>,
109 /// The snapshot delivery preferences to be requested to Lightstreamer Server for the items in the Subscription.
110 requested_snapshot: Option<Snapshot>,
111 /// The selector name for all the items in the Subscription, used as a filter on the updates received.
112 selector: Option<String>,
113 /// A list of SubscriptionListener instances that will receive events from this Subscription.
114 listeners: Vec<Box<dyn SubscriptionListener>>,
115 /// A HashMap storing the latest values received for each item/field pair.
116 values: HashMap<(usize, usize), String>,
117 /// A HashMap storing the latest values received for each key/field pair in a COMMAND Subscription.
118 command_values: HashMap<String, HashMap<usize, String>>,
119 /// A flag indicating whether the Subscription is currently active or not.
120 is_active: bool,
121 /// A flag indicating whether the Subscription is currently subscribed to through the server or not.
122 is_subscribed: bool,
123 /// Client assigned subscription ID.
124 pub(crate) id: usize,
125 /// A channel sender to send the subscription ID to the Lightstreamer client.
126 pub(crate) id_sender: Sender<usize>,
127 /// A channel receiver to receive the subscription ID from the Lightstreamer client.
128 pub(crate) id_receiver: Receiver<usize>,
129}
130
131impl Subscription {
132 /// Constructor for creating a new Subscription instance.
133 ///
134 /// # Parameters
135 /// - `mode`: The subscription mode for the items, required by Lightstreamer Server.
136 /// - `items`: An array of items to be subscribed to through Lightstreamer server. It is also possible to specify the "Item List" or "Item Group" later.
137 /// - `fields`: An array of fields for the items to be subscribed to through Lightstreamer Server. It is also possible to specify the "Field List" or "Field Schema" later.
138 ///
139 /// # Errors
140 /// Returns an error if no items or fields are provided.
141 pub fn new(
142 mode: SubscriptionMode,
143 items: Option<Vec<String>>,
144 fields: Option<Vec<String>>,
145 ) -> Result<Subscription, LightstreamerError> {
146 if items.is_none() || fields.is_none() {
147 return Err(LightstreamerError::invalid_argument(
148 "Items and fields must be provided",
149 ));
150 }
151
152 // Channel capacity of 2 ensures subscription ID updates are not lost even if
153 // processing is slightly delayed. The ID is sent once per subscription lifecycle.
154 let (id_sender, id_receiver) = channel(2);
155
156 Ok(Subscription {
157 mode,
158 items,
159 item_group: None,
160 fields,
161 field_schema: None,
162 data_adapter: None,
163 command_second_level_data_adapter: None,
164 command_second_level_fields: None,
165 command_second_level_field_schema: None,
166 requested_buffer_size: None,
167 requested_max_frequency: None,
168 requested_snapshot: None,
169 selector: None,
170 listeners: Vec::new(),
171 values: HashMap::new(),
172 command_values: HashMap::new(),
173 is_active: false,
174 is_subscribed: false,
175 id: 0,
176 id_sender,
177 id_receiver,
178 })
179 }
180
181 /// Adds a listener that will receive events from the Subscription instance.
182 ///
183 /// The same listener can be added to several different Subscription instances.
184 ///
185 /// # Lifecycle
186 /// A listener can be added at any time. A call to add a listener already present will be ignored.
187 ///
188 /// # Parameters
189 /// - `listener`: An object that will receive the events as documented in the SubscriptionListener interface.
190 ///
191 /// # See also
192 /// `removeListener()`
193 pub fn add_listener(&mut self, listener: Box<dyn SubscriptionListener>) {
194 self.listeners.push(listener);
195 }
196
197 /// Removes a listener from the Subscription instance so that it will not receive events anymore.
198 ///
199 /// # Lifecycle
200 /// A listener can be removed at any time.
201 ///
202 /// # Parameters
203 /// - `listener`: The listener to be removed.
204 ///
205 /// # See also
206 /// `addListener()`
207 pub fn remove_listener<T>(&mut self, listener: &T)
208 where
209 T: SubscriptionListener,
210 {
211 self.listeners.retain(|l| {
212 let l_ref = l.as_ref() as &dyn SubscriptionListener;
213 let listener_ref = listener as &dyn SubscriptionListener;
214 std::ptr::addr_of!(*l_ref) != std::ptr::addr_of!(*listener_ref)
215 });
216 }
217
218 /// Returns a list containing the SubscriptionListener instances that were added to this client.
219 ///
220 /// # Returns
221 /// A list containing the listeners that were added to this client.
222 ///
223 /// # See also
224 /// `addListener()`
225 pub fn get_listeners(&self) -> &Vec<Box<dyn SubscriptionListener>> {
226 &self.listeners
227 }
228
229 /// Inquiry method that can be used to read the mode specified for this Subscription.
230 ///
231 /// # Lifecycle
232 /// This method can be called at any time.
233 ///
234 /// # Returns
235 /// The Subscription mode specified in the constructor.
236 pub fn get_mode(&self) -> &SubscriptionMode {
237 &self.mode
238 }
239
240 /// Setter method that sets the "Item Group" to be subscribed to through Lightstreamer Server.
241 ///
242 /// Any call to this method will override any "Item List" or "Item Group" previously specified.
243 ///
244 /// # Lifecycle
245 /// This method can only be called while the Subscription instance is in its "inactive" state.
246 ///
247 /// # Errors
248 /// Returns an error if the Subscription is currently "active".
249 ///
250 /// # Parameters
251 /// - `group`: A String to be expanded into an item list by the Metadata Adapter.
252 pub fn set_item_group(&mut self, group: String) -> Result<(), String> {
253 if self.is_active {
254 return Err("Subscription is active. This method can only be called while the Subscription instance is in its 'inactive' state.".to_string());
255 }
256 self.item_group = Some(group);
257 Ok(())
258 }
259
260 /// Inquiry method that can be used to read the item group specified for this Subscription.
261 ///
262 /// # Lifecycle
263 /// This method can only be called if the Subscription has been initialized using an "Item Group"
264 ///
265 /// # Returns
266 /// The "Item Group" to be subscribed to through the server, or `None` if the Subscription was initialized with an "Item List" or was not initialized at all.
267 pub fn get_item_group(&self) -> Option<&String> {
268 self.item_group.as_ref()
269 }
270
271 /// Setter method that sets the "Item List" to be subscribed to through Lightstreamer Server.
272 ///
273 /// Any call to this method will override any "Item List" or "Item Group" previously specified.
274 ///
275 /// # Lifecycle
276 /// This method can only be called while the Subscription instance is in its "inactive" state.
277 ///
278 /// # Errors
279 /// - Returns an error if the Subscription is currently "active".
280 /// - Returns an error if any of the item names in the "Item List" contains a space, is a number, or is empty/None.
281 ///
282 /// # Parameters
283 /// - `items`: An array of items to be subscribed to through the server.
284 pub fn set_items(&mut self, items: Vec<String>) -> Result<(), String> {
285 if self.is_active {
286 return Err("Subscription is active".to_string());
287 }
288 for item in &items {
289 if item.contains(" ") || item.parse::<usize>().is_ok() || item.is_empty() {
290 return Err("Invalid item name".to_string());
291 }
292 }
293 self.items = Some(items);
294 Ok(())
295 }
296
297 /// Inquiry method that can be used to read the "Item List" specified for this Subscription.
298 /// Note that if the single-item-constructor was used, this method will return an array of length 1 containing such item.
299 ///
300 /// # Lifecycle
301 /// This method can only be called if the Subscription has been initialized with an "Item List".
302 ///
303 /// # Returns
304 /// The "Item List" to be subscribed to through the server, or `None` if the Subscription was initialized with an "Item Group" or was not initialized at all.
305 pub fn get_items(&self) -> Option<&Vec<String>> {
306 self.items.as_ref()
307 }
308
309 /// Setter method that sets the "Field Schema" to be subscribed to through Lightstreamer Server.
310 ///
311 /// Any call to this method will override any "Field List" or "Field Schema" previously specified.
312 ///
313 /// # Lifecycle
314 /// This method can only be called while the Subscription instance is in its "inactive" state.
315 ///
316 /// # Errors
317 /// Returns an error if the Subscription is currently "active".
318 ///
319 /// # Parameters
320 /// - `schema`: A String to be expanded into a field list by the Metadata Adapter.
321 pub fn set_field_schema(&mut self, schema: String) -> Result<(), String> {
322 if self.is_active {
323 return Err("Subscription is active".to_string());
324 }
325 self.field_schema = Some(schema);
326 Ok(())
327 }
328
329 /// Inquiry method that can be used to read the field schema specified for this Subscription.
330 ///
331 /// # Lifecycle
332 /// This method can only be called if the Subscription has been initialized using a "Field Schema"
333 ///
334 /// # Returns
335 /// The "Field Schema" to be subscribed to through the server, or `None` if the Subscription was initialized with a "Field List" or was not initialized at all.
336 pub fn get_field_schema(&self) -> Option<&String> {
337 self.field_schema.as_ref()
338 }
339
340 /// Setter method that sets the "Field List" to be subscribed to through Lightstreamer Server.
341 ///
342 /// Any call to this method will override any "Field List" or "Field Schema" previously specified.
343 ///
344 /// # Lifecycle
345 /// This method can only be called while the Subscription instance is in its "inactive" state.
346 ///
347 /// # Errors
348 /// - Returns an error if the Subscription is currently "active".
349 /// - Returns an error if any of the field names in the list contains a space or is empty/None.
350 ///
351 /// # Parameters
352 /// - `fields`: An array of fields to be subscribed to through the server.
353 pub fn set_fields(&mut self, fields: Vec<String>) -> Result<(), String> {
354 if self.is_active {
355 return Err("Subscription is active".to_string());
356 }
357 for field in &fields {
358 if field.contains(" ") || field.is_empty() {
359 return Err("Invalid field name".to_string());
360 }
361 }
362 self.fields = Some(fields);
363 Ok(())
364 }
365
366 /// Inquiry method that can be used to read the "Field List" specified for this Subscription.
367 ///
368 /// # Lifecycle
369 /// This method can only be called if the Subscription has been initialized using a "Field List".
370 ///
371 /// # Returns
372 /// The "Field List" to be subscribed to through the server, or `None` if the Subscription was initialized with a "Field Schema" or was not initialized at all.
373 pub fn get_fields(&self) -> Option<&Vec<String>> {
374 self.fields.as_ref()
375 }
376
377 /// Setter method that sets the name of the Data Adapter (within the Adapter Set used by the current session) that supplies all the items for this Subscription.
378 ///
379 /// The Data Adapter name is configured on the server side through the "name" attribute of the `<data_provider>` element, in the "adapters.xml" file that defines the Adapter Set (a missing attribute configures the "DEFAULT" name).
380 ///
381 /// Note that if more than one Data Adapter is needed to supply all the items in a set of items, then it is not possible to group all the items of the set in a single Subscription. Multiple Subscriptions have to be defined.
382 ///
383 /// # Default
384 /// The default Data Adapter for the Adapter Set, configured as "DEFAULT" on the Server.
385 ///
386 /// # Lifecycle
387 /// This method can only be called while the Subscription instance is in its "inactive" state.
388 ///
389 /// # Errors
390 /// Returns an error if the Subscription is currently "active".
391 ///
392 /// # Parameters
393 /// - `adapter`: The name of the Data Adapter. A `None` value is equivalent to the "DEFAULT" name.
394 ///
395 /// # See also
396 /// `ConnectionDetails.setAdapterSet()`
397 pub fn set_data_adapter(&mut self, adapter: Option<String>) -> Result<(), String> {
398 if self.is_active {
399 return Err("Subscription is active".to_string());
400 }
401 self.data_adapter = adapter;
402 Ok(())
403 }
404
405 /// Inquiry method that can be used to read the name of the Data Adapter specified for this Subscription through `setDataAdapter()`.
406 ///
407 /// # Lifecycle
408 /// This method can be called at any time.
409 ///
410 /// # Returns
411 /// The name of the Data Adapter; returns `None` if no name has been configured, so that the "DEFAULT" Adapter Set is used.
412 pub fn get_data_adapter(&self) -> Option<&String> {
413 self.data_adapter.as_ref()
414 }
415
416 /// Setter method that sets the name of the second-level Data Adapter (within the Adapter Set used by the current session)
417 /// Setter method that sets the name of the second-level Data Adapter (within the Adapter Set used by the current session) that supplies all the second-level items for a COMMAND Subscription.
418 ///
419 /// All the possible second-level items should be supplied in "MERGE" mode with snapshot available.
420 ///
421 /// The Data Adapter name is configured on the server side through the "name" attribute of the `<data_provider>` element, in the "adapters.xml" file that defines the Adapter Set (a missing attribute configures the "DEFAULT" name).
422 ///
423 /// # Default
424 /// The default Data Adapter for the Adapter Set, configured as "DEFAULT" on the Server.
425 ///
426 /// # Lifecycle
427 /// This method can only be called while the Subscription instance is in its "inactive" state.
428 ///
429 /// # Errors
430 /// - Returns an error if the Subscription is currently "active".
431 /// - Returns an error if the Subscription mode is not "COMMAND".
432 ///
433 /// # Parameters
434 /// - `adapter`: The name of the Data Adapter. A `None` value is equivalent to the "DEFAULT" name.
435 ///
436 /// # See also
437 /// `Subscription.setCommandSecondLevelFields()`
438 ///
439 /// # See also
440 /// `Subscription.setCommandSecondLevelFieldSchema()`
441 pub fn set_command_second_level_data_adapter(
442 &mut self,
443 adapter: Option<String>,
444 ) -> Result<(), String> {
445 if self.is_active {
446 return Err("Subscription is active".to_string());
447 }
448 if self.mode != SubscriptionMode::Command {
449 return Err("Subscription mode is not Command".to_string());
450 }
451 self.command_second_level_data_adapter = adapter;
452 Ok(())
453 }
454
455 /// Inquiry method that can be used to read the name of the second-level Data Adapter specified for this Subscription through `setCommandSecondLevelDataAdapter()`.
456 ///
457 /// # Lifecycle
458 /// This method can be called at any time.
459 ///
460 /// # Errors
461 /// Returns an error if the Subscription mode is not COMMAND.
462 ///
463 /// # Returns
464 /// The name of the second-level Data Adapter.
465 ///
466 /// # See also
467 /// `setCommandSecondLevelDataAdapter()`
468 pub fn get_command_second_level_data_adapter(&self) -> Option<&String> {
469 if self.mode != SubscriptionMode::Command {
470 return None;
471 }
472 self.command_second_level_data_adapter.as_ref()
473 }
474
475 /// Setter method that sets the "Field Schema" to be subscribed to through Lightstreamer Server for the second-level items. It can only be used on COMMAND Subscriptions.
476 ///
477 /// Any call to this method will override any "Field List" or "Field Schema" previously specified for the second-level.
478 ///
479 /// Calling this method enables the two-level behavior:
480 ///
481 /// In synthesis, each time a new key is received on the COMMAND Subscription, the key value is treated as an Item name and an underlying Subscription for this Item is created and subscribed to automatically, to feed fields specified by this method. This mono-item Subscription is specified through an "Item List" containing only the Item name received. As a consequence, all the conditions provided for subscriptions through Item Lists have to be satisfied. The item is subscribed to in "MERGE" mode, with snapshot request and with the same maximum frequency setting as for the first-level items (including the "unfiltered" case). All other Subscription properties are left as the default. When the key is deleted by a DELETE command on the first-level Subscription, the associated second-level Subscription is also unsubscribed from.
482 ///
483 /// Specify `None` as parameter will disable the two-level behavior.
484 ///
485 /// # Lifecycle
486 /// This method can only be called while the Subscription instance is in its "inactive" state.
487 ///
488 /// # Errors
489 /// - Returns an error if the Subscription is currently "active".
490 /// - Returns an error if the Subscription mode is not "COMMAND".
491 ///
492 /// # Parameters
493 /// - `schema`: A String to be expanded into a field list by the Metadata Adapter.
494 ///
495 /// # See also
496 /// `Subscription.setCommandSecondLevelFields()`
497 pub fn set_command_second_level_field_schema(
498 &mut self,
499 schema: Option<String>,
500 ) -> Result<(), String> {
501 if self.is_active {
502 return Err("Subscription is active".to_string());
503 }
504 if self.mode != SubscriptionMode::Command {
505 return Err("Subscription mode is not Command".to_string());
506 }
507 self.command_second_level_field_schema = schema;
508 Ok(())
509 }
510
511 /// Inquiry method that can be used to read the "Field Schema" specified for second-level Subscriptions.
512 ///
513 /// # Lifecycle
514 /// This method can only be called if the second-level of this Subscription has been initialized using a "Field Schema".
515 ///
516 /// # Errors
517 /// Returns an error if the Subscription mode is not COMMAND.
518 ///
519 /// # Returns
520 /// The "Field Schema" to be subscribed to through the server, or `None` if the Subscription was initialized with a "Field List" or was not initialized at all.
521 ///
522 /// # See also
523 /// `Subscription.setCommandSecondLevelFieldSchema()`
524 pub fn get_command_second_level_field_schema(&self) -> Option<&String> {
525 if self.mode != SubscriptionMode::Command {
526 return None;
527 }
528 self.command_second_level_field_schema.as_ref()
529 }
530
531 /// Setter method that sets the "Field List" to be subscribed to through Lightstreamer Server for the second-level items. It can only be used on COMMAND Subscriptions.
532 ///
533 /// Any call to this method will override any "Field List" or "Field Schema" previously specified for the second-level.
534 ///
535 /// Calling this method enables the two-level behavior:
536 ///
537 /// In synthesis, each time a new key is received on the COMMAND Subscription, the key value is treated as an Item name and an underlying Subscription for this Item is created and subscribed to automatically, to feed fields specified by this method. This mono-item Subscription is specified through an "Item List" containing only the Item name received. As a consequence, all the conditions provided for subscriptions through Item Lists have to be satisfied. The item is subscribed to in "MERGE" mode, with snapshot request and with the same maximum frequency setting as for the first-level items (including the "unfiltered" case). All other Subscription properties are left as the default. When the key is deleted by a DELETE command on the first-level Subscription, the associated second-level Subscription is also unsubscribed from.
538 ///
539 /// Specifying `None` as parameter will disable the two-level behavior.
540 ///
541 /// # Lifecycle
542 /// This method can only be called while the Subscription instance is in its "inactive" state.
543 ///
544 /// # Errors
545 /// - Returns an error if the Subscription is currently "active".
546 /// - Returns an error if the Subscription mode is not "COMMAND".
547 /// - Returns an error if any of the field names in the "Field List" contains a space or is empty/None.
548 ///
549 /// # Parameters
550 /// - `fields`: An array of Strings containing a list of fields to be subscribed to through the server. Ensure that no name conflict is generated between first-level and second-level fields. In case of conflict, the second-level field will not be accessible by name, but only by position.
551 ///
552 /// # See also
553 /// `Subscription.setCommandSecondLevelFieldSchema()`
554 pub fn set_command_second_level_fields(
555 &mut self,
556 fields: Option<Vec<String>>,
557 ) -> Result<(), String> {
558 if self.is_active {
559 return Err("Subscription is active".to_string());
560 }
561 if self.mode != SubscriptionMode::Command {
562 return Err("Subscription mode is not Command".to_string());
563 }
564 if let Some(ref fields) = fields {
565 for field in fields {
566 if field.contains(" ") || field.is_empty() {
567 return Err("Invalid field name".to_string());
568 }
569 }
570 }
571 self.command_second_level_fields = fields;
572 Ok(())
573 }
574
575 /// Inquiry method that can be used to read the "Field List" specified for second-level Subscriptions.
576 ///
577 /// # Lifecycle
578 /// This method can only be called if the second-level of this Subscription has been initialized using a "Field List"
579 ///
580 /// # Errors
581 /// Returns an error if the Subscription mode is not COMMAND.
582 ///
583 /// # Returns
584 /// The list of fields to be subscribed to through the server, or `None` if the Subscription was initialized with a "Field Schema" or was not initialized at all.
585 ///
586 /// # See also
587 /// `Subscription.setCommandSecondLevelFields()`
588 pub fn get_command_second_level_fields(&self) -> Option<&Vec<String>> {
589 if self.mode != SubscriptionMode::Command {
590 return None;
591 }
592 self.command_second_level_fields.as_ref()
593 }
594
595 /// Setter method that sets the length to be requested to Lightstreamer Server for the internal queuing buffers for the items in the Subscription. A Queuing buffer is used by the Server to accumulate a burst of updates for an item, so that they can all be sent to the client, despite of bandwidth or frequency limits. It can be used only when the subscription mode is MERGE or DISTINCT and unfiltered dispatching has not been requested. Note that the Server may pose an upper limit on the size of its internal buffers.
596 ///
597 /// # Default
598 /// `None`, meaning to lean on the Server default based on the subscription mode. This means that the buffer size will be 1 for MERGE subscriptions and "unlimited" for DISTINCT subscriptions. See the "General Concepts" document for further details.
599 ///
600 /// # Lifecycle
601 /// This method can only be called while the Subscription instance is in its "inactive" state.
602 ///
603 /// # Errors
604 /// - Returns an error if the Subscription is currently "active".
605 /// - Returns an error if the specified value is not `None` nor "unlimited" nor a valid positive integer number.
606 ///
607 /// # Parameters
608 /// - `size`: An integer number, representing the length of the internal queuing buffers to be used in the Server. If the string "unlimited" is supplied, then no buffer size limit is requested (the check is case insensitive). It is also possible to supply a `None` value to stick to the Server default (which currently depends on the subscription mode).
609 ///
610 /// # See also
611 /// `Subscription.setRequestedMaxFrequency()`
612 pub fn set_requested_buffer_size(&mut self, size: Option<usize>) -> Result<(), String> {
613 if self.is_active {
614 return Err("Subscription is active".to_string());
615 }
616 self.requested_buffer_size = size;
617 Ok(())
618 }
619
620 /// Inquiry method that can be used to read the buffer size, configured though `setRequestedBufferSize()`, to be requested to the Server for this Subscription.
621 ///
622 /// # Lifecycle
623 /// This method can be called at any time.
624 ///
625 /// # Returns
626 /// An integer number, representing the buffer size to be requested to the server, or the string "unlimited", or `None`.
627 pub fn get_requested_buffer_size(&self) -> Option<&usize> {
628 self.requested_buffer_size.as_ref()
629 }
630
631 /// Setter method that sets the maximum update frequency to be requested to Lightstreamer Server for all the items in the Subscription. It can be used only if the Subscription mode is MERGE, DISTINCT or COMMAND (in the latter case, the frequency limitation applies to the UPDATE events for each single key). For Subscriptions with two-level behavior (see `Subscription.setCommandSecondLevelFields()` and `Subscription.setCommandSecondLevelFieldSchema()`), the specified frequency limit applies to both first-level and second-level items.
632 ///
633 /// Note that frequency limits on the items can also be set on the server side and this request can only be issued in order to furtherly reduce the frequency, not to rise it beyond these limits.
634 ///
635 /// This method can also be used to request unfiltered dispatching for the items in the Subscription. However, unfiltered dispatching requests may be refused if any frequency limit is posed on the server side for some item.
636 ///
637 /// # General Edition Note
638 /// A further global frequency limit could also be imposed by the Server, depending on Edition and License Type; this specific limit also applies to RAW mode and to unfiltered dispatching. To know what features are enabled by your license, please see the License tab of the Monitoring Dashboard (by default, available at /dashboard).
639 ///
640 /// # Default
641 /// `None`, meaning to lean on the Server default based on the subscription mode. This consists, for all modes, in not applying any frequency limit to the subscription (the same as "unlimited"); see the "General Concepts" document for further details.
642 ///
643 /// # Lifecycle
644 /// This method can can be called at any time with some differences based on the Subscription status:
645 ///
646 /// - If the Subscription instance is in its "inactive" state then this method can be called at will.
647 ///
648 /// - If the Subscription instance is in its "active" state then the method can still be called unless the current value is "unfiltered" or the supplied value is "unfiltered" or `None`. If the Subscription instance is in its "active" state and the connection to the server is currently open, then a request to change the frequency of the Subscription on the fly is sent to the server.
649 ///
650 /// # Errors
651 /// - Returns an error if the Subscription is currently "active" and the current value of this property is "unfiltered".
652 /// - Returns an error if the Subscription is currently "active" and the given parameter is `None` or "unfiltered".
653 /// - Returns an error if the specified value is not `None` nor one of the special "unlimited" and "unfiltered" values nor a valid positive number.
654 ///
655 /// # Parameters
656 /// - `freq`: A decimal number, representing the maximum update frequency (expressed in updates per second) for each item in the Subscription; for instance, with a setting of 0.5, for each single item, no more than one update every 2 seconds will be received. If the string "unlimited" is supplied, then no frequency limit is requested. It is also possible to supply the string "unfiltered", to ask for unfiltered dispatching, if it is allowed for the items, or a `None` value to stick to the Server default (which currently corresponds to "unlimited"). The check for the string constants is case insensitive.
657 pub fn set_requested_max_frequency(&mut self, freq: Option<f64>) -> Result<(), String> {
658 if self.is_active && self.requested_max_frequency.is_none() {
659 return Err("Subscription is active and current value is unfiltered".to_string());
660 }
661 if self.is_active && freq.is_none() {
662 return Err("Cannot set unfiltered while active".to_string());
663 }
664 if self.is_active && freq.is_none() {
665 return Err("Cannot set None while active".to_string());
666 }
667 self.requested_max_frequency = freq;
668 Ok(())
669 }
670
671 /// Inquiry method that can be used to read the max frequency, configured through `setRequestedMaxFrequency()`, to be requested to the Server for this Subscription.
672 ///
673 /// # Lifecycle
674 /// This method can be called at any time.
675 ///
676 /// # Returns
677 /// A decimal number, representing the max frequency to be requested to the server (expressed in updates per second), or the strings "unlimited" or "unfiltered", or `None`.
678 pub fn get_requested_max_frequency(&self) -> Option<&f64> {
679 self.requested_max_frequency.as_ref()
680 }
681
682 /// Setter method that enables/disables snapshot delivery request for the items in the Subscription. The snapshot can be requested only if the Subscription mode is MERGE, DISTINCT or COMMAND.
683 ///
684 /// # Default
685 /// "yes" if the Subscription mode is not "RAW", `None` otherwise.
686 ///
687 /// # Lifecycle
688 /// This method can only be called while the Subscription instance is in its "inactive" state.
689 ///
690 /// # Errors
691 /// - Returns an error if the Subscription is currently "active".
692 /// - Returns an error if the specified value is not "yes" nor "no" nor `None` nor a valid integer positive number.
693 /// - Returns an error if the specified value is not compatible with the mode of the Subscription:
694 /// - In case of a RAW Subscription only `None` is a valid value;
695 /// - In case of a non-DISTINCT Subscription only `None` "yes" and "no" are valid values.
696 ///
697 /// # Parameters
698 /// - `snapshot`: "yes"/"no" to request/not request snapshot delivery (the check is case insensitive). If the Subscription mode is DISTINCT, instead of "yes", it is also possible to supply an integer number, to specify the requested length of the snapshot (though the length of the received snapshot may be less than
699 /// requested, because of insufficient data or server side limits); passing "yes" means that the snapshot length should be determined only by the Server. `None` is also a valid value; if specified, no snapshot preference will be sent to the server that will decide itself whether or not to send any snapshot.
700 ///
701 /// # See also
702 /// `ItemUpdate.isSnapshot()`
703 pub fn set_requested_snapshot(&mut self, snapshot: Option<Snapshot>) -> Result<(), String> {
704 if self.is_active {
705 return Err("Subscription is active".to_string());
706 }
707 match snapshot {
708 Some(Snapshot::None) if self.mode == SubscriptionMode::Raw => {
709 return Err("Cannot request snapshot for Raw mode".to_string());
710 }
711 Some(Snapshot::Number(_)) if self.mode != SubscriptionMode::Distinct => {
712 return Err("Cannot specify snapshot length for non-Distinct mode".to_string());
713 }
714 _ => {}
715 }
716 self.requested_snapshot = snapshot;
717 Ok(())
718 }
719
720 /// Inquiry method that can be used to read the snapshot preferences, configured through `setRequestedSnapshot()`, to be requested to the Server for this Subscription.
721 ///
722 /// # Lifecycle
723 /// This method can be called at any time.
724 ///
725 /// # Returns
726 /// "yes", "no", `None`, or an integer number.
727 pub fn get_requested_snapshot(&self) -> Option<&Snapshot> {
728 self.requested_snapshot.as_ref()
729 }
730
731 /// Setter method that sets the selector name for all the items in the Subscription. The selector is a filter on the updates received. It is executed on the Server and implemented by the Metadata Adapter.
732 ///
733 /// # Default
734 /// `None` (no selector).
735 ///
736 /// # Lifecycle
737 /// This method can only be called while the Subscription instance is in its "inactive" state.
738 ///
739 /// # Errors
740 /// Returns an error if the Subscription is currently "active".
741 ///
742 /// # Parameters
743 /// - `selector`: The name of a selector, to be recognized by the Metadata Adapter, or `None` to unset the selector.
744 pub fn set_selector(&mut self, selector: Option<String>) -> Result<(), String> {
745 if self.is_active {
746 return Err("Subscription is active".to_string());
747 }
748 self.selector = selector;
749 Ok(())
750 }
751
752 /// Inquiry method that can be used to read the selector name specified for this Subscription through `setSelector()`.
753 ///
754 /// # Lifecycle
755 /// This method can be called at any time.
756 ///
757 /// # Returns
758 /// The name of the selector.
759 pub fn get_selector(&self) -> Option<&String> {
760 self.selector.as_ref()
761 }
762
763 /// Returns the latest value received for the specified item/field pair.
764 ///
765 /// It is suggested to consume real-time data by implementing and adding a proper SubscriptionListener rather than probing this method. In case of COMMAND Subscriptions, the value returned by this method may be misleading, as in COMMAND mode all the keys received, being part of the same item, will overwrite each other; for COMMAND Subscriptions, use `Subscription.getCommandValue()` instead.
766 ///
767 /// Note that internal data is cleared when the Subscription is unsubscribed from.
768 ///
769 /// # Lifecycle
770 /// This method can be called at any time; if called to retrieve a value that has not been received yet, then it will return `None`.
771 ///
772 /// # Errors
773 /// Returns an error if an invalid item name or field name is specified or if the specified item position or field position is out of bounds.
774 ///
775 /// # Parameters
776 /// - `item_pos`: A String representing an item in the configured item list or a Number representing the 1-based position of the item in the specified item group. (In case an item list was specified, passing the item position is also possible).
777 /// - `field_pos`: A String representing a field in the configured field list or a Number representing the 1-based position of the field in the specified field schema. (In case a field list was specified, passing the field position is also possible).
778 ///
779 /// # Returns
780 /// The current value for the specified field of the specified item(possibly `None`), or `None` if no value has been received yet.
781 pub fn get_value(&self, item_pos: usize, field_pos: usize) -> Option<&String> {
782 self.values.get(&(item_pos, field_pos))
783 }
784
785 /// Returns the latest value received for the specified item/key/field combination in a COMMAND Subscription. This method can only be used if the Subscription mode is COMMAND. Subscriptions with two-level behavior are also supported, hence the specified field can be either a first-level or a second-level one.
786 ///
787 /// It is suggested to consume real-time data by implementing and adding a proper SubscriptionListener rather than probing this method.
788 ///
789 /// Note that internal data is cleared when the Subscription is unsubscribed from.
790 ///
791 /// # Lifecycle
792 /// This method can be called at any time; if called to retrieve a value that has not been received yet, then it will return `None`.
793 ///
794 /// # Errors
795 /// - Returns an error if an invalid item name or field name is specified or if the specified item position or field position is out of bounds.
796 /// - Returns an error if the Subscription mode is not COMMAND.
797 ///
798 /// # Parameters
799 /// - `item_pos`: A String representing an item in the configured item list or a Number representing the 1-based position of the item in the specified item group. (In case an item list was specified, passing the item position is also possible).
800 /// - `key`: A String containing the value of a key received on the COMMAND subscription.
801 /// - `field_pos`: A String representing a field in the configured field list or a Number representing the 1-based position of the field in the specified field schema. (In case a field list was specified, passing the field position is also possible).
802 ///
803 /// # Returns
804 /// The current value for the specified field of the specified key within the specified item (possibly `None`), or `None` if the specified key has not been added yet (note that it might have been added and eventually deleted).
805 pub fn get_command_value(
806 &self,
807 item_pos: usize,
808 key: &str,
809 field_pos: usize,
810 ) -> Option<&String> {
811 let key = format!("{}_{}", item_pos, key);
812 self.command_values
813 .get(&key)
814 .and_then(|fields| fields.get(&field_pos))
815 }
816
817 /// Inquiry method that checks if the Subscription is currently "active" or not. Most of the Subscription properties cannot be modified if a Subscription is "active".
818 ///
819 /// The status of a Subscription is changed to "active" through the `LightstreamerClient.subscribe()` method and back to "inactive" through the `LightstreamerClient.unsubscribe()` one.
820 ///
821 /// # Lifecycle
822 /// This method can be called at any time.
823 ///
824 /// # Returns
825 /// `true`/`false` if the Subscription is "active" or not.
826 ///
827 /// # See also
828 /// `LightstreamerClient.subscribe()`
829 ///
830 /// # See also
831 /// `LightstreamerClient.unsubscribe()`
832 pub fn is_active(&self) -> bool {
833 self.is_active
834 }
835
836 /// Inquiry method that checks if the Subscription is currently subscribed to through the server or not.
837 ///
838 /// This flag is switched to true by server sent Subscription events, and back to false in case of client disconnection, `LightstreamerClient.unsubscribe()` calls and server sent unsubscription events.
839 ///
840 /// # Lifecycle
841 /// This method can be called at any time.
842 ///
843 /// # Returns
844 /// `true`/`false` if the Subscription is subscribed to through the server or not.
845 pub fn is_subscribed(&self) -> bool {
846 self.is_subscribed
847 }
848
849 /// Returns the position of the "key" field in a COMMAND Subscription.
850 ///
851 /// This method can only be used if the Subscription mode is COMMAND and the Subscription was initialized using a "Field Schema".
852 ///
853 /// # Lifecycle
854 /// This method can be called at any time after the first `SubscriptionListener.onSubscription()` event.
855 ///
856 /// # Errors
857 /// - Returns an error if the Subscription mode is not COMMAND or if the `SubscriptionListener.onSubscription()` event for this Subscription was not yet fired.
858 /// - Returns an error if a "Field List" was specified.
859 ///
860 /// # Returns
861 /// The 1-based position of the "key" field within the "Field Schema".
862 pub fn get_key_position(&self) -> Option<usize> {
863 if self.mode != SubscriptionMode::Command || !self.is_subscribed {
864 return None;
865 }
866 if let Some(ref schema) = self.field_schema {
867 return schema.split(',').position(|field| field.trim() == "key");
868 }
869 None
870 }
871
872 /// Returns the position of the "command" field in a COMMAND Subscription.
873 ///
874 /// This method can only be used if the Subscription mode is COMMAND and the Subscription was initialized using a "Field Schema".
875 ///
876 /// # Lifecycle
877 /// This method can be called at any time after the first `SubscriptionListener.onSubscription()` event.
878 ///
879 /// # Errors
880 /// - Returns an error if the Subscription mode is not COMMAND or if the `SubscriptionListener.onSubscription()` event for this Subscription was not yet fired.
881 ///
882 /// # Returns
883 /// The 1-based position of the "command" field within the "Field Schema".
884 pub fn get_command_position(&self) -> Option<usize> {
885 if self.mode != SubscriptionMode::Command || !self.is_subscribed {
886 return None;
887 }
888 if let Some(ref schema) = self.field_schema {
889 return schema
890 .split(',')
891 .position(|field| field.trim() == "command");
892 }
893 None
894 }
895
896 /*
897 /// Handles the subscription event.
898 pub fn on_subscription(&mut self) {
899 self.is_subscribed = true;
900 for listener in &mut self.listeners {
901 listener.on_subscription();
902 }
903 }
904
905 /// Handles the unsubscription event.
906 pub fn on_unsubscription(&mut self) {
907 self.is_subscribed = false;
908 self.values.clear();
909 self.command_values.clear();
910 for listener in &mut self.listeners {
911 listener.on_unsubscription();
912 }
913 }
914
915 /// Handles an update event for a regular Subscription.
916 pub fn on_update(&mut self, item_pos: usize, field_pos: usize, value: String, is_snapshot: bool) {
917 self.values.insert((item_pos, field_pos), value.clone());
918 for listener in &mut self.listeners {
919 listener.on_update(item_pos, field_pos, &value, is_snapshot);
920 }
921 }
922
923 /// Handles an update event for a COMMAND Subscription.
924 pub fn on_command_update(&mut self, key: String, item_pos: usize, field_pos: usize, value: String, is_snapshot: bool) {
925 self.command_values
926 .entry(key.clone())
927 .or_insert_with(HashMap::new)
928 .insert(field_pos, value.clone());
929 for listener in &mut self.listeners {
930 listener.on_command_update(&key, item_pos, field_pos, &value, is_snapshot);
931 }
932 }
933 */
934}
935
936impl Debug for Subscription {
937 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
938 f.debug_struct("Subscription")
939 .field("mode", &self.mode)
940 .field("item_group", &self.item_group)
941 .field("items", &self.items)
942 .field("field_schema", &self.field_schema)
943 .field("fields", &self.fields)
944 .field("data_adapter", &self.data_adapter)
945 .field(
946 "command_second_level_data_adapter",
947 &self.command_second_level_data_adapter,
948 )
949 .field(
950 "command_second_level_field_schema",
951 &self.command_second_level_field_schema,
952 )
953 .field(
954 "command_second_level_fields",
955 &self.command_second_level_fields,
956 )
957 .field("requested_buffer_size", &self.requested_buffer_size)
958 .field("requested_max_frequency", &self.requested_max_frequency)
959 .field("requested_snapshot", &self.requested_snapshot)
960 .field("selector", &self.selector)
961 .field("is_active", &self.is_active)
962 .field("is_subscribed", &self.is_subscribed)
963 .finish()
964 }
965}
966
967#[cfg(test)]
968mod tests {
969 use super::*;
970 use crate::subscription::ItemUpdate;
971 use crate::utils::LightstreamerError;
972 use std::sync::{Arc, Mutex};
973
974 struct MockSubscriptionListener {
975 subscription_called: Arc<Mutex<bool>>,
976 unsubscription_called: Arc<Mutex<bool>>,
977 item_update_called: Arc<Mutex<bool>>,
978 }
979
980 impl MockSubscriptionListener {
981 fn new() -> Self {
982 MockSubscriptionListener {
983 subscription_called: Arc::new(Mutex::new(false)),
984 unsubscription_called: Arc::new(Mutex::new(false)),
985 item_update_called: Arc::new(Mutex::new(false)),
986 }
987 }
988 }
989
990 impl SubscriptionListener for MockSubscriptionListener {
991 fn on_subscription(&mut self) {
992 if let Ok(mut guard) = self.subscription_called.lock() {
993 *guard = true;
994 }
995 }
996
997 fn on_unsubscription(&mut self) {
998 if let Ok(mut guard) = self.unsubscription_called.lock() {
999 *guard = true;
1000 }
1001 }
1002
1003 fn on_item_update(&self, _update: &ItemUpdate) {
1004 if let Ok(mut guard) = self.item_update_called.lock() {
1005 *guard = true;
1006 }
1007 }
1008 }
1009
1010 #[test]
1011 fn test_new_subscription() -> Result<(), LightstreamerError> {
1012 let subscription = Subscription::new(
1013 SubscriptionMode::Merge,
1014 Some(vec!["item1".to_string(), "item2".to_string()]),
1015 Some(vec!["field1".to_string(), "field2".to_string()]),
1016 )?;
1017
1018 assert_eq!(*subscription.get_mode(), SubscriptionMode::Merge);
1019 if let Some(items) = subscription.get_items() {
1020 assert_eq!(items, &vec!["item1".to_string(), "item2".to_string()]);
1021 }
1022 if let Some(fields) = subscription.get_fields() {
1023 assert_eq!(fields, &vec!["field1".to_string(), "field2".to_string()]);
1024 }
1025
1026 let subscription = Subscription::new(
1027 SubscriptionMode::Merge,
1028 None,
1029 Some(vec!["field1".to_string()]),
1030 );
1031 assert!(subscription.is_err());
1032
1033 let subscription = Subscription::new(
1034 SubscriptionMode::Merge,
1035 Some(vec!["item1".to_string()]),
1036 None,
1037 );
1038 assert!(subscription.is_err());
1039 Ok(())
1040 }
1041
1042 #[test]
1043 fn test_add_and_remove_listener() -> Result<(), LightstreamerError> {
1044 let mut subscription = Subscription::new(
1045 SubscriptionMode::Merge,
1046 Some(vec!["item1".to_string()]),
1047 Some(vec!["field1".to_string()]),
1048 )?;
1049 assert_eq!(subscription.get_listeners().len(), 0);
1050
1051 let listener = Box::new(MockSubscriptionListener::new());
1052 subscription.add_listener(listener);
1053 assert_eq!(subscription.get_listeners().len(), 1);
1054
1055 let listener2 = MockSubscriptionListener::new();
1056 subscription.remove_listener(&listener2);
1057 assert_eq!(subscription.get_listeners().len(), 1);
1058
1059 subscription.add_listener(Box::new(listener2));
1060 assert_eq!(subscription.get_listeners().len(), 2);
1061 Ok(())
1062 }
1063
1064 #[test]
1065 fn test_set_items() -> Result<(), LightstreamerError> {
1066 let mut subscription = Subscription::new(
1067 SubscriptionMode::Merge,
1068 Some(vec!["item1".to_string()]),
1069 Some(vec!["field1".to_string()]),
1070 )?;
1071
1072 let result = subscription.set_items(vec!["new_item1".to_string(), "new_item2".to_string()]);
1073 assert!(result.is_ok());
1074
1075 if let Some(items) = subscription.get_items() {
1076 assert_eq!(
1077 items,
1078 &vec!["new_item1".to_string(), "new_item2".to_string()]
1079 );
1080 }
1081
1082 subscription.is_active = true;
1083
1084 let result = subscription.set_items(vec!["another_item".to_string()]);
1085 assert!(result.is_err());
1086 Ok(())
1087 }
1088
1089 #[test]
1090 fn test_set_fields() -> Result<(), LightstreamerError> {
1091 let mut subscription = Subscription::new(
1092 SubscriptionMode::Merge,
1093 Some(vec!["item1".to_string()]),
1094 Some(vec!["field1".to_string()]),
1095 )?;
1096
1097 let result =
1098 subscription.set_fields(vec!["new_field1".to_string(), "new_field2".to_string()]);
1099 assert!(result.is_ok());
1100
1101 if let Some(fields) = subscription.get_fields() {
1102 assert_eq!(
1103 fields,
1104 &vec!["new_field1".to_string(), "new_field2".to_string()]
1105 );
1106 }
1107
1108 subscription.is_active = true;
1109
1110 let result = subscription.set_fields(vec!["another_field".to_string()]);
1111 assert!(result.is_err());
1112 Ok(())
1113 }
1114
1115 #[test]
1116 fn test_set_item_group() -> Result<(), LightstreamerError> {
1117 let mut subscription = Subscription::new(
1118 SubscriptionMode::Merge,
1119 Some(vec!["item1".to_string()]),
1120 Some(vec!["field1".to_string()]),
1121 )?;
1122
1123 let result = subscription.set_item_group("group1".to_string());
1124 assert!(result.is_ok());
1125
1126 if let Some(group) = subscription.get_item_group() {
1127 assert_eq!(group, "group1");
1128 }
1129
1130 subscription.is_active = true;
1131
1132 let result = subscription.set_item_group("another_group".to_string());
1133 assert!(result.is_err());
1134 Ok(())
1135 }
1136
1137 #[test]
1138 fn test_set_field_schema() -> Result<(), LightstreamerError> {
1139 let mut subscription = Subscription::new(
1140 SubscriptionMode::Merge,
1141 Some(vec!["item1".to_string()]),
1142 Some(vec!["field1".to_string()]),
1143 )?;
1144
1145 let result = subscription.set_field_schema("schema1".to_string());
1146 assert!(result.is_ok());
1147
1148 if let Some(schema) = subscription.get_field_schema() {
1149 assert_eq!(schema, "schema1");
1150 }
1151
1152 subscription.is_active = true;
1153
1154 let result = subscription.set_field_schema("another_schema".to_string());
1155 assert!(result.is_err());
1156 Ok(())
1157 }
1158
1159 #[test]
1160 fn test_set_data_adapter() -> Result<(), LightstreamerError> {
1161 let mut subscription = Subscription::new(
1162 SubscriptionMode::Merge,
1163 Some(vec!["item1".to_string()]),
1164 Some(vec!["field1".to_string()]),
1165 )?;
1166
1167 let result = subscription.set_data_adapter(Some("adapter1".to_string()));
1168 assert!(result.is_ok());
1169 if let Some(adapter) = subscription.get_data_adapter() {
1170 assert_eq!(adapter, "adapter1");
1171 }
1172
1173 subscription.is_active = true;
1174
1175 let result = subscription.set_data_adapter(Some("another_adapter".to_string()));
1176 assert!(result.is_err());
1177 Ok(())
1178 }
1179
1180 #[test]
1181 fn test_set_requested_snapshot() -> Result<(), LightstreamerError> {
1182 let mut subscription = Subscription::new(
1183 SubscriptionMode::Merge,
1184 Some(vec!["item1".to_string()]),
1185 Some(vec!["field1".to_string()]),
1186 )?;
1187
1188 let result = subscription.set_requested_snapshot(Some(Snapshot::Yes));
1189 assert!(result.is_ok());
1190
1191 if let Some(snapshot) = subscription.get_requested_snapshot() {
1192 match snapshot {
1193 Snapshot::Yes => {} // OK
1194 _ => panic!("Expected Snapshot::Yes"),
1195 }
1196 }
1197
1198 subscription.is_active = true;
1199
1200 let result = subscription.set_requested_snapshot(Some(Snapshot::No));
1201 assert!(result.is_err());
1202 Ok(())
1203 }
1204
1205 #[test]
1206 fn test_set_requested_buffer_size() -> Result<(), LightstreamerError> {
1207 let mut subscription = Subscription::new(
1208 SubscriptionMode::Merge,
1209 Some(vec!["item1".to_string()]),
1210 Some(vec!["field1".to_string()]),
1211 )?;
1212
1213 let result = subscription.set_requested_buffer_size(Some(10));
1214 assert!(result.is_ok());
1215
1216 if let Some(size) = subscription.get_requested_buffer_size() {
1217 assert_eq!(size, &10);
1218 }
1219
1220 subscription.is_active = true;
1221
1222 let result = subscription.set_requested_buffer_size(Some(20));
1223 assert!(result.is_err());
1224 Ok(())
1225 }
1226
1227 #[test]
1228 fn test_set_requested_max_frequency() -> Result<(), LightstreamerError> {
1229 let mut subscription = Subscription::new(
1230 SubscriptionMode::Merge,
1231 Some(vec!["item1".to_string()]),
1232 Some(vec!["field1".to_string()]),
1233 )?;
1234
1235 let result = subscription.set_requested_max_frequency(Some(10.5));
1236 assert!(result.is_ok());
1237
1238 if let Some(freq) = subscription.get_requested_max_frequency() {
1239 assert_eq!(freq, &10.5);
1240 }
1241
1242 subscription.is_active = true;
1243
1244 let result = subscription.set_requested_max_frequency(Some(20.5));
1245 assert!(result.is_ok());
1246
1247 subscription.requested_max_frequency = None;
1248 let result = subscription.set_requested_max_frequency(Some(30.5));
1249 assert!(result.is_err());
1250 Ok(())
1251 }
1252
1253 #[test]
1254 fn test_set_selector() -> Result<(), LightstreamerError> {
1255 let mut subscription = Subscription::new(
1256 SubscriptionMode::Merge,
1257 Some(vec!["item1".to_string()]),
1258 Some(vec!["field1".to_string()]),
1259 )?;
1260
1261 let result = subscription.set_selector(Some("selector1".to_string()));
1262 assert!(result.is_ok());
1263
1264 if let Some(selector) = subscription.get_selector() {
1265 assert_eq!(selector, "selector1");
1266 }
1267
1268 subscription.is_active = true;
1269
1270 let result = subscription.set_selector(Some("another_selector".to_string()));
1271 assert!(result.is_err());
1272 Ok(())
1273 }
1274
1275 #[test]
1276 fn test_command_second_level_methods() -> Result<(), LightstreamerError> {
1277 let mut subscription = Subscription::new(
1278 SubscriptionMode::Command,
1279 Some(vec!["item1".to_string()]),
1280 Some(vec!["field1".to_string()]),
1281 )?;
1282
1283 let result =
1284 subscription.set_command_second_level_data_adapter(Some("adapter1".to_string()));
1285 assert!(result.is_ok());
1286 if let Some(adapter) = subscription.get_command_second_level_data_adapter() {
1287 assert_eq!(adapter, "adapter1");
1288 }
1289
1290 subscription.is_active = true;
1291
1292 let result =
1293 subscription.set_command_second_level_data_adapter(Some("adapter2".to_string()));
1294 assert!(result.is_err());
1295
1296 subscription.is_active = false;
1297
1298 let result = subscription.set_command_second_level_fields(Some(vec![
1299 "field1".to_string(),
1300 "field2".to_string(),
1301 ]));
1302 assert!(result.is_ok());
1303 if let Some(fields) = subscription.get_command_second_level_fields() {
1304 assert_eq!(fields, &vec!["field1".to_string(), "field2".to_string()]);
1305 }
1306
1307 let result =
1308 subscription.set_command_second_level_field_schema(Some("schema1".to_string()));
1309 assert!(result.is_ok());
1310 if let Some(schema) = subscription.get_command_second_level_field_schema() {
1311 assert_eq!(schema, "schema1");
1312 }
1313
1314 let mut non_command_subscription = Subscription::new(
1315 SubscriptionMode::Merge,
1316 Some(vec!["item1".to_string()]),
1317 Some(vec!["field1".to_string()]),
1318 )?;
1319
1320 let result = non_command_subscription
1321 .set_command_second_level_data_adapter(Some("adapter1".to_string()));
1322 assert!(result.is_err());
1323
1324 let result = non_command_subscription
1325 .set_command_second_level_fields(Some(vec!["field1".to_string()]));
1326 assert!(result.is_err());
1327
1328 let result = non_command_subscription
1329 .set_command_second_level_field_schema(Some("schema1".to_string()));
1330 assert!(result.is_err());
1331 Ok(())
1332 }
1333
1334 #[test]
1335 fn test_is_active_and_is_subscribed() -> Result<(), LightstreamerError> {
1336 let subscription = Subscription::new(
1337 SubscriptionMode::Merge,
1338 Some(vec!["item1".to_string()]),
1339 Some(vec!["field1".to_string()]),
1340 )?;
1341
1342 assert!(!subscription.is_active());
1343 assert!(!subscription.is_subscribed());
1344 Ok(())
1345 }
1346
1347 #[test]
1348 fn test_get_key_position() -> Result<(), LightstreamerError> {
1349 // Create a COMMAND subscription with field_schema containing key
1350 let mut subscription = Subscription::new(
1351 SubscriptionMode::Command,
1352 Some(vec!["item1".to_string()]),
1353 Some(vec![
1354 "key".to_string(),
1355 "command".to_string(),
1356 "field1".to_string(),
1357 ]),
1358 )?;
1359
1360 // Set field_schema with key field
1361 subscription.field_schema = Some("key,command,field1".to_string());
1362
1363 // Not subscribed yet, should return None
1364 assert_eq!(subscription.get_key_position(), None);
1365
1366 // Mark as subscribed
1367 subscription.is_subscribed = true;
1368
1369 // Now it should return the position of key (0)
1370 assert_eq!(subscription.get_key_position(), Some(0));
1371
1372 // Test with a non-COMMAND subscription
1373 let mut non_command_subscription = Subscription::new(
1374 SubscriptionMode::Merge,
1375 Some(vec!["item1".to_string()]),
1376 Some(vec!["key".to_string(), "field1".to_string()]),
1377 )?;
1378
1379 non_command_subscription.field_schema = Some("key,field1".to_string());
1380 non_command_subscription.is_subscribed = true;
1381
1382 // Should return None for non-COMMAND subscription
1383 assert_eq!(non_command_subscription.get_key_position(), None);
1384
1385 // Test with COMMAND subscription but no key field
1386 let mut no_key_subscription = Subscription::new(
1387 SubscriptionMode::Command,
1388 Some(vec!["item1".to_string()]),
1389 Some(vec!["command".to_string(), "field1".to_string()]),
1390 )?;
1391
1392 no_key_subscription.field_schema = Some("command,field1".to_string());
1393 no_key_subscription.is_subscribed = true;
1394
1395 // Should return None when key field is not present
1396 assert_eq!(no_key_subscription.get_key_position(), None);
1397 Ok(())
1398 }
1399
1400 #[test]
1401 fn test_get_command_position() -> Result<(), LightstreamerError> {
1402 // Create a COMMAND subscription with field_schema containing command
1403 let mut subscription = Subscription::new(
1404 SubscriptionMode::Command,
1405 Some(vec!["item1".to_string()]),
1406 Some(vec![
1407 "key".to_string(),
1408 "command".to_string(),
1409 "field1".to_string(),
1410 ]),
1411 )?;
1412
1413 // Set field_schema with command field
1414 subscription.field_schema = Some("key,command,field1".to_string());
1415
1416 // Not subscribed yet, should return None
1417 assert_eq!(subscription.get_command_position(), None);
1418
1419 // Mark as subscribed
1420 subscription.is_subscribed = true;
1421
1422 // Now it should return the position of command (1)
1423 assert_eq!(subscription.get_command_position(), Some(1));
1424
1425 // Test with a non-COMMAND subscription
1426 let mut non_command_subscription = Subscription::new(
1427 SubscriptionMode::Merge,
1428 Some(vec!["item1".to_string()]),
1429 Some(vec!["command".to_string(), "field1".to_string()]),
1430 )?;
1431
1432 non_command_subscription.field_schema = Some("command,field1".to_string());
1433 non_command_subscription.is_subscribed = true;
1434
1435 // Should return None for non-COMMAND subscription
1436 assert_eq!(non_command_subscription.get_command_position(), None);
1437
1438 // Test with COMMAND subscription but no command field
1439 let mut no_command_subscription = Subscription::new(
1440 SubscriptionMode::Command,
1441 Some(vec!["item1".to_string()]),
1442 Some(vec!["key".to_string(), "field1".to_string()]),
1443 )?;
1444
1445 no_command_subscription.field_schema = Some("key,field1".to_string());
1446 no_command_subscription.is_subscribed = true;
1447
1448 // Should return None when command field is not present
1449 assert_eq!(no_command_subscription.get_command_position(), None);
1450 Ok(())
1451 }
1452
1453 #[test]
1454 fn test_debug_implementation() -> Result<(), LightstreamerError> {
1455 let subscription = Subscription::new(
1456 SubscriptionMode::Merge,
1457 Some(vec!["item1".to_string()]),
1458 Some(vec!["field1".to_string()]),
1459 )?;
1460
1461 // Test that Debug implementation works without panicking
1462 let debug_string = format!("{:?}", subscription);
1463
1464 // Verify it contains some expected fields
1465 assert!(debug_string.contains("mode"));
1466 assert!(debug_string.contains("items"));
1467 assert!(debug_string.contains("fields"));
1468 assert!(debug_string.contains("is_active"));
1469 assert!(debug_string.contains("is_subscribed"));
1470 Ok(())
1471 }
1472
1473 #[test]
1474 fn test_snapshot_display() {
1475 // Test the Display implementation for Snapshot
1476 assert_eq!(format!("{}", Snapshot::Yes), "true");
1477 assert_eq!(format!("{}", Snapshot::No), "false");
1478 assert_eq!(format!("{}", Snapshot::Number(5)), "5");
1479 assert_eq!(format!("{}", Snapshot::None), "");
1480 }
1481
1482 #[test]
1483 fn test_subscription_mode_display() {
1484 // Test the Display implementation for SubscriptionMode
1485 assert_eq!(format!("{}", SubscriptionMode::Merge), "MERGE");
1486 assert_eq!(format!("{}", SubscriptionMode::Distinct), "DISTINCT");
1487 assert_eq!(format!("{}", SubscriptionMode::Command), "COMMAND");
1488 assert_eq!(format!("{}", SubscriptionMode::Raw), "RAW");
1489 }
1490
1491 #[test]
1492 fn test_snapshot_default() {
1493 // Test the Default implementation for &Snapshot
1494 let default_snapshot: &Snapshot = Default::default();
1495 assert!(matches!(default_snapshot, &Snapshot::None));
1496 }
1497
1498 #[test]
1499 fn test_get_command_value() -> Result<(), LightstreamerError> {
1500 let mut subscription = Subscription::new(
1501 SubscriptionMode::Command,
1502 Some(vec!["item1".to_string()]),
1503 Some(vec![
1504 "key".to_string(),
1505 "command".to_string(),
1506 "field1".to_string(),
1507 ]),
1508 )?;
1509
1510 // Manually add some command values to test get_command_value
1511 let mut field_map = HashMap::new();
1512 field_map.insert(3, "test_value".to_string());
1513 subscription
1514 .command_values
1515 .insert("1_test_key".to_string(), field_map);
1516
1517 // Test getting an existing value
1518 let value = subscription.get_command_value(1, "test_key", 3);
1519 assert_eq!(value, Some(&"test_value".to_string()));
1520
1521 // Test getting a non-existent key
1522 let value = subscription.get_command_value(1, "non_existent_key", 3);
1523 assert_eq!(value, None);
1524
1525 // Test getting a non-existent field position
1526 let value = subscription.get_command_value(1, "test_key", 4);
1527 assert_eq!(value, None);
1528
1529 // Test getting a non-existent item position
1530 let value = subscription.get_command_value(2, "test_key", 3);
1531 assert_eq!(value, None);
1532 Ok(())
1533 }
1534
1535 #[test]
1536 fn test_set_requested_buffer_size_with_unlimited() -> Result<(), LightstreamerError> {
1537 let mut subscription = Subscription::new(
1538 SubscriptionMode::Merge,
1539 Some(vec!["item1".to_string()]),
1540 Some(vec!["field1".to_string()]),
1541 )?;
1542
1543 // Test setting buffer size to "unlimited"
1544 let result = subscription.set_requested_buffer_size(None);
1545 assert!(result.is_ok());
1546 assert_eq!(subscription.get_requested_buffer_size(), None);
1547 Ok(())
1548 }
1549
1550 #[test]
1551 fn test_command_second_level_field_methods_with_invalid_inputs()
1552 -> Result<(), LightstreamerError> {
1553 // Test with non-COMMAND subscription
1554 let mut non_command_subscription = Subscription::new(
1555 SubscriptionMode::Merge,
1556 Some(vec!["item1".to_string()]),
1557 Some(vec!["field1".to_string()]),
1558 )?;
1559
1560 // Test set_command_second_level_fields with invalid subscription mode
1561 let result = non_command_subscription
1562 .set_command_second_level_fields(Some(vec!["field1".to_string()]));
1563 assert!(result.is_err());
1564 assert_eq!(result.unwrap_err(), "Subscription mode is not Command");
1565
1566 // Test set_command_second_level_field_schema with invalid subscription mode
1567 let result = non_command_subscription
1568 .set_command_second_level_field_schema(Some("field1".to_string()));
1569 assert!(result.is_err());
1570 assert_eq!(result.unwrap_err(), "Subscription mode is not Command");
1571
1572 // Test with COMMAND subscription but active
1573 let mut command_subscription = Subscription::new(
1574 SubscriptionMode::Command,
1575 Some(vec!["item1".to_string()]),
1576 Some(vec!["field1".to_string()]),
1577 )?;
1578
1579 // Make the subscription active
1580 command_subscription.is_active = true;
1581
1582 // Test set_command_second_level_fields with active subscription
1583 let result =
1584 command_subscription.set_command_second_level_fields(Some(vec!["field1".to_string()]));
1585 assert!(result.is_err());
1586 assert_eq!(result.unwrap_err(), "Subscription is active");
1587
1588 // Test set_command_second_level_field_schema with active subscription
1589 let result =
1590 command_subscription.set_command_second_level_field_schema(Some("field1".to_string()));
1591 assert!(result.is_err());
1592 if let Err(e) = result {
1593 assert_eq!(e, "Subscription is active");
1594 }
1595 Ok(())
1596 }
1597
1598 #[test]
1599 fn test_set_data_adapter_with_invalid_inputs() -> Result<(), LightstreamerError> {
1600 let mut subscription = Subscription::new(
1601 SubscriptionMode::Merge,
1602 Some(vec!["item1".to_string()]),
1603 Some(vec!["field1".to_string()]),
1604 )?;
1605
1606 // Make the subscription active
1607 subscription.is_active = true;
1608
1609 // Test set_data_adapter with active subscription
1610 let result = subscription.set_data_adapter(Some("adapter1".to_string()));
1611 assert!(result.is_err());
1612 if let Err(e) = result {
1613 assert_eq!(e, "Subscription is active");
1614 }
1615 Ok(())
1616 }
1617
1618 #[test]
1619 fn test_set_selector_with_invalid_inputs() -> Result<(), LightstreamerError> {
1620 let mut subscription = Subscription::new(
1621 SubscriptionMode::Merge,
1622 Some(vec!["item1".to_string()]),
1623 Some(vec!["field1".to_string()]),
1624 )?;
1625
1626 // Make the subscription active
1627 subscription.is_active = true;
1628
1629 // Test set_selector with active subscription
1630 let result = subscription.set_selector(Some("selector1".to_string()));
1631 assert!(result.is_err());
1632 if let Err(e) = result {
1633 assert_eq!(e, "Subscription is active");
1634 }
1635 Ok(())
1636 }
1637}