Skip to main content

lightstreamer_rs/subscription/
model.rs

1// Copyright (C) 2024 Joaquín Béjar García
2// Portions of this file are derived from lightstreamer-client
3// Copyright (C) 2024 Daniel López Azaña
4// Original project: https://github.com/daniloaz/lightstreamer-client
5//
6// This file is part of lightstreamer-rs.
7//
8// lightstreamer-rs is free software: you can redistribute it and/or modify
9// it under the terms of the GNU General Public License as published by
10// the Free Software Foundation, either version 3 of the License, or
11// (at your option) any later version.
12//
13// lightstreamer-rs is distributed in the hope that it will be useful,
14// but WITHOUT ANY WARRANTY; without even the implied warranty of
15// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16// GNU General Public License for more details.
17//
18// You should have received a copy of the GNU General Public License
19// along with lightstreamer-rs. If not, see <https://www.gnu.org/licenses/>.
20
21use crate::subscription::SubscriptionListener;
22use crate::utils::LightstreamerError;
23use std::collections::HashMap;
24use std::fmt::{self, Debug, Formatter};
25use tokio::sync::mpsc::{Receiver, Sender, channel};
26
27/// Enum representing the snapshot delivery preferences to be requested to Lightstreamer Server for the items in the Subscription.
28#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash)]
29#[repr(u8)]
30pub enum Snapshot {
31    /// Request the full snapshot for the subscribed items.
32    Yes,
33    /// Do not request any snapshot for the subscribed items.
34    No,
35    /// Request a snapshot with a specific length (number of updates).
36    Number(usize),
37    /// Default value. No snapshot preference will be sent to the server.
38    #[default]
39    None,
40}
41
42impl Default for &Snapshot {
43    fn default() -> Self {
44        &Snapshot::None
45    }
46}
47
48impl fmt::Display for Snapshot {
49    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
50        match self {
51            Snapshot::Yes => write!(f, "true"),
52            Snapshot::No => write!(f, "false"),
53            Snapshot::Number(n) => write!(f, "{}", n),
54            Snapshot::None => write!(f, ""),
55        }
56    }
57}
58
59/// Enum representing the subscription mode.
60#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
61#[repr(u8)]
62pub enum SubscriptionMode {
63    /// MERGE mode. The server sends an update for a specific item only if the state of at least one of the fields has changed.
64    Merge,
65    /// DISTINCT mode. The server sends an update for an item every time new data for that item is available.
66    Distinct,
67    /// RAW mode. The server forwards any update for an item that it receives, without performing any processing.
68    Raw,
69    /// COMMAND mode. The server sends updates based on add, update, and delete commands.
70    Command,
71}
72
73impl fmt::Display for SubscriptionMode {
74    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
75        match self {
76            SubscriptionMode::Merge => write!(f, "MERGE"),
77            SubscriptionMode::Distinct => write!(f, "DISTINCT"),
78            SubscriptionMode::Command => write!(f, "COMMAND"),
79            SubscriptionMode::Raw => write!(f, "RAW"),
80        }
81    }
82}
83
84/// Struct representing a Subscription to be submitted to a Lightstreamer Server.
85/// It contains subscription details and the listeners needed to process the real-time data.
86pub struct Subscription {
87    /// The subscription mode for the items, required by Lightstreamer Server.
88    mode: SubscriptionMode,
89    /// An array of items to be subscribed to through Lightstreamer server.
90    items: Option<Vec<String>>,
91    /// An "Item Group" identifier representing a list of items.
92    item_group: Option<String>,
93    /// An array of fields for the items to be subscribed to through Lightstreamer Server.
94    fields: Option<Vec<String>>,
95    /// A "Field Schema" identifier representing a list of fields.
96    field_schema: Option<String>,
97    /// The name of the Data Adapter that supplies all the items for this Subscription.
98    data_adapter: Option<String>,
99    /// The name of the second-level Data Adapter for a COMMAND Subscription.
100    command_second_level_data_adapter: Option<String>,
101    /// The "Field List" to be subscribed to through Lightstreamer Server for the second-level items in a COMMAND Subscription.
102    command_second_level_fields: Option<Vec<String>>,
103    /// The "Field Schema" to be subscribed to through Lightstreamer Server for the second-level items in a COMMAND Subscription.
104    command_second_level_field_schema: Option<String>,
105    /// The length to be requested to Lightstreamer Server for the internal queuing buffers for the items in the Subscription.
106    requested_buffer_size: Option<usize>,
107    /// The maximum update frequency to be requested to Lightstreamer Server for all the items in the Subscription.
108    requested_max_frequency: Option<f64>,
109    /// The snapshot delivery preferences to be requested to Lightstreamer Server for the items in the Subscription.
110    requested_snapshot: Option<Snapshot>,
111    /// The selector name for all the items in the Subscription, used as a filter on the updates received.
112    selector: Option<String>,
113    /// A list of SubscriptionListener instances that will receive events from this Subscription.
114    listeners: Vec<Box<dyn SubscriptionListener>>,
115    /// A HashMap storing the latest values received for each item/field pair.
116    values: HashMap<(usize, usize), String>,
117    /// A HashMap storing the latest values received for each key/field pair in a COMMAND Subscription.
118    command_values: HashMap<String, HashMap<usize, String>>,
119    /// A flag indicating whether the Subscription is currently active or not.
120    is_active: bool,
121    /// A flag indicating whether the Subscription is currently subscribed to through the server or not.
122    is_subscribed: bool,
123    /// Client assigned subscription ID.
124    pub(crate) id: usize,
125    /// A channel sender to send the subscription ID to the Lightstreamer client.
126    pub(crate) id_sender: Sender<usize>,
127    /// A channel receiver to receive the subscription ID from the Lightstreamer client.
128    pub(crate) id_receiver: Receiver<usize>,
129}
130
131impl Subscription {
132    /// Constructor for creating a new Subscription instance.
133    ///
134    /// # Parameters
135    /// - `mode`: The subscription mode for the items, required by Lightstreamer Server.
136    /// - `items`: An array of items to be subscribed to through Lightstreamer server. It is also possible to specify the "Item List" or "Item Group" later.
137    /// - `fields`: An array of fields for the items to be subscribed to through Lightstreamer Server. It is also possible to specify the "Field List" or "Field Schema" later.
138    ///
139    /// # Errors
140    /// Returns an error if no items or fields are provided.
141    pub fn new(
142        mode: SubscriptionMode,
143        items: Option<Vec<String>>,
144        fields: Option<Vec<String>>,
145    ) -> Result<Subscription, LightstreamerError> {
146        if items.is_none() || fields.is_none() {
147            return Err(LightstreamerError::invalid_argument(
148                "Items and fields must be provided",
149            ));
150        }
151
152        // Channel capacity of 2 ensures subscription ID updates are not lost even if
153        // processing is slightly delayed. The ID is sent once per subscription lifecycle.
154        let (id_sender, id_receiver) = channel(2);
155
156        Ok(Subscription {
157            mode,
158            items,
159            item_group: None,
160            fields,
161            field_schema: None,
162            data_adapter: None,
163            command_second_level_data_adapter: None,
164            command_second_level_fields: None,
165            command_second_level_field_schema: None,
166            requested_buffer_size: None,
167            requested_max_frequency: None,
168            requested_snapshot: None,
169            selector: None,
170            listeners: Vec::new(),
171            values: HashMap::new(),
172            command_values: HashMap::new(),
173            is_active: false,
174            is_subscribed: false,
175            id: 0,
176            id_sender,
177            id_receiver,
178        })
179    }
180
181    /// Adds a listener that will receive events from the Subscription instance.
182    ///
183    /// The same listener can be added to several different Subscription instances.
184    ///
185    /// # Lifecycle
186    /// A listener can be added at any time. A call to add a listener already present will be ignored.
187    ///
188    /// # Parameters
189    /// - `listener`: An object that will receive the events as documented in the SubscriptionListener interface.
190    ///
191    /// # See also
192    /// `removeListener()`
193    pub fn add_listener(&mut self, listener: Box<dyn SubscriptionListener>) {
194        self.listeners.push(listener);
195    }
196
197    /// Removes a listener from the Subscription instance so that it will not receive events anymore.
198    ///
199    /// # Lifecycle
200    /// A listener can be removed at any time.
201    ///
202    /// # Parameters
203    /// - `listener`: The listener to be removed.
204    ///
205    /// # See also
206    /// `addListener()`
207    pub fn remove_listener<T>(&mut self, listener: &T)
208    where
209        T: SubscriptionListener,
210    {
211        self.listeners.retain(|l| {
212            let l_ref = l.as_ref() as &dyn SubscriptionListener;
213            let listener_ref = listener as &dyn SubscriptionListener;
214            std::ptr::addr_of!(*l_ref) != std::ptr::addr_of!(*listener_ref)
215        });
216    }
217
218    /// Returns a list containing the SubscriptionListener instances that were added to this client.
219    ///
220    /// # Returns
221    /// A list containing the listeners that were added to this client.
222    ///
223    /// # See also
224    /// `addListener()`
225    pub fn get_listeners(&self) -> &Vec<Box<dyn SubscriptionListener>> {
226        &self.listeners
227    }
228
229    /// Inquiry method that can be used to read the mode specified for this Subscription.
230    ///
231    /// # Lifecycle
232    /// This method can be called at any time.
233    ///
234    /// # Returns
235    /// The Subscription mode specified in the constructor.
236    pub fn get_mode(&self) -> &SubscriptionMode {
237        &self.mode
238    }
239
240    /// Setter method that sets the "Item Group" to be subscribed to through Lightstreamer Server.
241    ///
242    /// Any call to this method will override any "Item List" or "Item Group" previously specified.
243    ///
244    /// # Lifecycle
245    /// This method can only be called while the Subscription instance is in its "inactive" state.
246    ///
247    /// # Errors
248    /// Returns an error if the Subscription is currently "active".
249    ///
250    /// # Parameters
251    /// - `group`: A String to be expanded into an item list by the Metadata Adapter.
252    pub fn set_item_group(&mut self, group: String) -> Result<(), String> {
253        if self.is_active {
254            return Err("Subscription is active. This method can only be called while the Subscription instance is in its 'inactive' state.".to_string());
255        }
256        self.item_group = Some(group);
257        Ok(())
258    }
259
260    /// Inquiry method that can be used to read the item group specified for this Subscription.
261    ///
262    /// # Lifecycle
263    /// This method can only be called if the Subscription has been initialized using an "Item Group"
264    ///
265    /// # Returns
266    /// The "Item Group" to be subscribed to through the server, or `None` if the Subscription was initialized with an "Item List" or was not initialized at all.
267    pub fn get_item_group(&self) -> Option<&String> {
268        self.item_group.as_ref()
269    }
270
271    /// Setter method that sets the "Item List" to be subscribed to through Lightstreamer Server.
272    ///
273    /// Any call to this method will override any "Item List" or "Item Group" previously specified.
274    ///
275    /// # Lifecycle
276    /// This method can only be called while the Subscription instance is in its "inactive" state.
277    ///
278    /// # Errors
279    /// - Returns an error if the Subscription is currently "active".
280    /// - Returns an error if any of the item names in the "Item List" contains a space, is a number, or is empty/None.
281    ///
282    /// # Parameters
283    /// - `items`: An array of items to be subscribed to through the server.
284    pub fn set_items(&mut self, items: Vec<String>) -> Result<(), String> {
285        if self.is_active {
286            return Err("Subscription is active".to_string());
287        }
288        for item in &items {
289            if item.contains(" ") || item.parse::<usize>().is_ok() || item.is_empty() {
290                return Err("Invalid item name".to_string());
291            }
292        }
293        self.items = Some(items);
294        Ok(())
295    }
296
297    /// Inquiry method that can be used to read the "Item List" specified for this Subscription.
298    /// Note that if the single-item-constructor was used, this method will return an array of length 1 containing such item.
299    ///
300    /// # Lifecycle
301    /// This method can only be called if the Subscription has been initialized with an "Item List".
302    ///
303    /// # Returns
304    /// The "Item List" to be subscribed to through the server, or `None` if the Subscription was initialized with an "Item Group" or was not initialized at all.
305    pub fn get_items(&self) -> Option<&Vec<String>> {
306        self.items.as_ref()
307    }
308
309    /// Setter method that sets the "Field Schema" to be subscribed to through Lightstreamer Server.
310    ///
311    /// Any call to this method will override any "Field List" or "Field Schema" previously specified.
312    ///
313    /// # Lifecycle
314    /// This method can only be called while the Subscription instance is in its "inactive" state.
315    ///
316    /// # Errors
317    /// Returns an error if the Subscription is currently "active".
318    ///
319    /// # Parameters
320    /// - `schema`: A String to be expanded into a field list by the Metadata Adapter.
321    pub fn set_field_schema(&mut self, schema: String) -> Result<(), String> {
322        if self.is_active {
323            return Err("Subscription is active".to_string());
324        }
325        self.field_schema = Some(schema);
326        Ok(())
327    }
328
329    /// Inquiry method that can be used to read the field schema specified for this Subscription.
330    ///
331    /// # Lifecycle
332    /// This method can only be called if the Subscription has been initialized using a "Field Schema"
333    ///
334    /// # Returns
335    /// The "Field Schema" to be subscribed to through the server, or `None` if the Subscription was initialized with a "Field List" or was not initialized at all.
336    pub fn get_field_schema(&self) -> Option<&String> {
337        self.field_schema.as_ref()
338    }
339
340    /// Setter method that sets the "Field List" to be subscribed to through Lightstreamer Server.
341    ///
342    /// Any call to this method will override any "Field List" or "Field Schema" previously specified.
343    ///
344    /// # Lifecycle
345    /// This method can only be called while the Subscription instance is in its "inactive" state.
346    ///
347    /// # Errors
348    /// - Returns an error if the Subscription is currently "active".
349    /// - Returns an error if any of the field names in the list contains a space or is empty/None.
350    ///
351    /// # Parameters
352    /// - `fields`: An array of fields to be subscribed to through the server.
353    pub fn set_fields(&mut self, fields: Vec<String>) -> Result<(), String> {
354        if self.is_active {
355            return Err("Subscription is active".to_string());
356        }
357        for field in &fields {
358            if field.contains(" ") || field.is_empty() {
359                return Err("Invalid field name".to_string());
360            }
361        }
362        self.fields = Some(fields);
363        Ok(())
364    }
365
366    /// Inquiry method that can be used to read the "Field List" specified for this Subscription.
367    ///
368    /// # Lifecycle
369    /// This method can only be called if the Subscription has been initialized using a "Field List".
370    ///
371    /// # Returns
372    /// The "Field List" to be subscribed to through the server, or `None` if the Subscription was initialized with a "Field Schema" or was not initialized at all.
373    pub fn get_fields(&self) -> Option<&Vec<String>> {
374        self.fields.as_ref()
375    }
376
377    /// Setter method that sets the name of the Data Adapter (within the Adapter Set used by the current session) that supplies all the items for this Subscription.
378    ///
379    /// The Data Adapter name is configured on the server side through the "name" attribute of the `<data_provider>` element, in the "adapters.xml" file that defines the Adapter Set (a missing attribute configures the "DEFAULT" name).
380    ///
381    /// Note that if more than one Data Adapter is needed to supply all the items in a set of items, then it is not possible to group all the items of the set in a single Subscription. Multiple Subscriptions have to be defined.
382    ///
383    /// # Default
384    /// The default Data Adapter for the Adapter Set, configured as "DEFAULT" on the Server.
385    ///
386    /// # Lifecycle
387    /// This method can only be called while the Subscription instance is in its "inactive" state.
388    ///
389    /// # Errors
390    /// Returns an error if the Subscription is currently "active".
391    ///
392    /// # Parameters
393    /// - `adapter`: The name of the Data Adapter. A `None` value is equivalent to the "DEFAULT" name.
394    ///
395    /// # See also
396    /// `ConnectionDetails.setAdapterSet()`
397    pub fn set_data_adapter(&mut self, adapter: Option<String>) -> Result<(), String> {
398        if self.is_active {
399            return Err("Subscription is active".to_string());
400        }
401        self.data_adapter = adapter;
402        Ok(())
403    }
404
405    /// Inquiry method that can be used to read the name of the Data Adapter specified for this Subscription through `setDataAdapter()`.
406    ///
407    /// # Lifecycle
408    /// This method can be called at any time.
409    ///
410    /// # Returns
411    /// The name of the Data Adapter; returns `None` if no name has been configured, so that the "DEFAULT" Adapter Set is used.
412    pub fn get_data_adapter(&self) -> Option<&String> {
413        self.data_adapter.as_ref()
414    }
415
416    /// Setter method that sets the name of the second-level Data Adapter (within the Adapter Set used by the current session)
417    /// Setter method that sets the name of the second-level Data Adapter (within the Adapter Set used by the current session) that supplies all the second-level items for a COMMAND Subscription.
418    ///
419    /// All the possible second-level items should be supplied in "MERGE" mode with snapshot available.
420    ///
421    /// The Data Adapter name is configured on the server side through the "name" attribute of the `<data_provider>` element, in the "adapters.xml" file that defines the Adapter Set (a missing attribute configures the "DEFAULT" name).
422    ///
423    /// # Default
424    /// The default Data Adapter for the Adapter Set, configured as "DEFAULT" on the Server.
425    ///
426    /// # Lifecycle
427    /// This method can only be called while the Subscription instance is in its "inactive" state.
428    ///
429    /// # Errors
430    /// - Returns an error if the Subscription is currently "active".
431    /// - Returns an error if the Subscription mode is not "COMMAND".
432    ///
433    /// # Parameters
434    /// - `adapter`: The name of the Data Adapter. A `None` value is equivalent to the "DEFAULT" name.
435    ///
436    /// # See also
437    /// `Subscription.setCommandSecondLevelFields()`
438    ///
439    /// # See also
440    /// `Subscription.setCommandSecondLevelFieldSchema()`
441    pub fn set_command_second_level_data_adapter(
442        &mut self,
443        adapter: Option<String>,
444    ) -> Result<(), String> {
445        if self.is_active {
446            return Err("Subscription is active".to_string());
447        }
448        if self.mode != SubscriptionMode::Command {
449            return Err("Subscription mode is not Command".to_string());
450        }
451        self.command_second_level_data_adapter = adapter;
452        Ok(())
453    }
454
455    /// Inquiry method that can be used to read the name of the second-level Data Adapter specified for this Subscription through `setCommandSecondLevelDataAdapter()`.
456    ///
457    /// # Lifecycle
458    /// This method can be called at any time.
459    ///
460    /// # Errors
461    /// Returns an error if the Subscription mode is not COMMAND.
462    ///
463    /// # Returns
464    /// The name of the second-level Data Adapter.
465    ///
466    /// # See also
467    /// `setCommandSecondLevelDataAdapter()`
468    pub fn get_command_second_level_data_adapter(&self) -> Option<&String> {
469        if self.mode != SubscriptionMode::Command {
470            return None;
471        }
472        self.command_second_level_data_adapter.as_ref()
473    }
474
475    /// Setter method that sets the "Field Schema" to be subscribed to through Lightstreamer Server for the second-level items. It can only be used on COMMAND Subscriptions.
476    ///
477    /// Any call to this method will override any "Field List" or "Field Schema" previously specified for the second-level.
478    ///
479    /// Calling this method enables the two-level behavior:
480    ///
481    /// In synthesis, each time a new key is received on the COMMAND Subscription, the key value is treated as an Item name and an underlying Subscription for this Item is created and subscribed to automatically, to feed fields specified by this method. This mono-item Subscription is specified through an "Item List" containing only the Item name received. As a consequence, all the conditions provided for subscriptions through Item Lists have to be satisfied. The item is subscribed to in "MERGE" mode, with snapshot request and with the same maximum frequency setting as for the first-level items (including the "unfiltered" case). All other Subscription properties are left as the default. When the key is deleted by a DELETE command on the first-level Subscription, the associated second-level Subscription is also unsubscribed from.
482    ///
483    /// Specify `None` as parameter will disable the two-level behavior.
484    ///
485    /// # Lifecycle
486    /// This method can only be called while the Subscription instance is in its "inactive" state.
487    ///
488    /// # Errors
489    /// - Returns an error if the Subscription is currently "active".
490    /// - Returns an error if the Subscription mode is not "COMMAND".
491    ///
492    /// # Parameters
493    /// - `schema`: A String to be expanded into a field list by the Metadata Adapter.
494    ///
495    /// # See also
496    /// `Subscription.setCommandSecondLevelFields()`
497    pub fn set_command_second_level_field_schema(
498        &mut self,
499        schema: Option<String>,
500    ) -> Result<(), String> {
501        if self.is_active {
502            return Err("Subscription is active".to_string());
503        }
504        if self.mode != SubscriptionMode::Command {
505            return Err("Subscription mode is not Command".to_string());
506        }
507        self.command_second_level_field_schema = schema;
508        Ok(())
509    }
510
511    /// Inquiry method that can be used to read the "Field Schema" specified for second-level Subscriptions.
512    ///
513    /// # Lifecycle
514    /// This method can only be called if the second-level of this Subscription has been initialized using a "Field Schema".
515    ///
516    /// # Errors
517    /// Returns an error if the Subscription mode is not COMMAND.
518    ///
519    /// # Returns
520    /// The "Field Schema" to be subscribed to through the server, or `None` if the Subscription was initialized with a "Field List" or was not initialized at all.
521    ///
522    /// # See also
523    /// `Subscription.setCommandSecondLevelFieldSchema()`
524    pub fn get_command_second_level_field_schema(&self) -> Option<&String> {
525        if self.mode != SubscriptionMode::Command {
526            return None;
527        }
528        self.command_second_level_field_schema.as_ref()
529    }
530
531    /// Setter method that sets the "Field List" to be subscribed to through Lightstreamer Server for the second-level items. It can only be used on COMMAND Subscriptions.
532    ///
533    /// Any call to this method will override any "Field List" or "Field Schema" previously specified for the second-level.
534    ///
535    /// Calling this method enables the two-level behavior:
536    ///
537    /// In synthesis, each time a new key is received on the COMMAND Subscription, the key value is treated as an Item name and an underlying Subscription for this Item is created and subscribed to automatically, to feed fields specified by this method. This mono-item Subscription is specified through an "Item List" containing only the Item name received. As a consequence, all the conditions provided for subscriptions through Item Lists have to be satisfied. The item is subscribed to in "MERGE" mode, with snapshot request and with the same maximum frequency setting as for the first-level items (including the "unfiltered" case). All other Subscription properties are left as the default. When the key is deleted by a DELETE command on the first-level Subscription, the associated second-level Subscription is also unsubscribed from.
538    ///
539    /// Specifying `None` as parameter will disable the two-level behavior.
540    ///
541    /// # Lifecycle
542    /// This method can only be called while the Subscription instance is in its "inactive" state.
543    ///
544    /// # Errors
545    /// - Returns an error if the Subscription is currently "active".
546    /// - Returns an error if the Subscription mode is not "COMMAND".
547    /// - Returns an error if any of the field names in the "Field List" contains a space or is empty/None.
548    ///
549    /// # Parameters
550    /// - `fields`: An array of Strings containing a list of fields to be subscribed to through the server. Ensure that no name conflict is generated between first-level and second-level fields. In case of conflict, the second-level field will not be accessible by name, but only by position.
551    ///
552    /// # See also
553    /// `Subscription.setCommandSecondLevelFieldSchema()`
554    pub fn set_command_second_level_fields(
555        &mut self,
556        fields: Option<Vec<String>>,
557    ) -> Result<(), String> {
558        if self.is_active {
559            return Err("Subscription is active".to_string());
560        }
561        if self.mode != SubscriptionMode::Command {
562            return Err("Subscription mode is not Command".to_string());
563        }
564        if let Some(ref fields) = fields {
565            for field in fields {
566                if field.contains(" ") || field.is_empty() {
567                    return Err("Invalid field name".to_string());
568                }
569            }
570        }
571        self.command_second_level_fields = fields;
572        Ok(())
573    }
574
575    /// Inquiry method that can be used to read the "Field List" specified for second-level Subscriptions.
576    ///
577    /// # Lifecycle
578    /// This method can only be called if the second-level of this Subscription has been initialized using a "Field List"
579    ///
580    /// # Errors
581    /// Returns an error if the Subscription mode is not COMMAND.
582    ///
583    /// # Returns
584    /// The list of fields to be subscribed to through the server, or `None` if the Subscription was initialized with a "Field Schema" or was not initialized at all.
585    ///
586    /// # See also
587    /// `Subscription.setCommandSecondLevelFields()`
588    pub fn get_command_second_level_fields(&self) -> Option<&Vec<String>> {
589        if self.mode != SubscriptionMode::Command {
590            return None;
591        }
592        self.command_second_level_fields.as_ref()
593    }
594
595    /// Setter method that sets the length to be requested to Lightstreamer Server for the internal queuing buffers for the items in the Subscription. A Queuing buffer is used by the Server to accumulate a burst of updates for an item, so that they can all be sent to the client, despite of bandwidth or frequency limits. It can be used only when the subscription mode is MERGE or DISTINCT and unfiltered dispatching has not been requested. Note that the Server may pose an upper limit on the size of its internal buffers.
596    ///
597    /// # Default
598    /// `None`, meaning to lean on the Server default based on the subscription mode. This means that the buffer size will be 1 for MERGE subscriptions and "unlimited" for DISTINCT subscriptions. See the "General Concepts" document for further details.
599    ///
600    /// # Lifecycle
601    /// This method can only be called while the Subscription instance is in its "inactive" state.
602    ///
603    /// # Errors
604    /// - Returns an error if the Subscription is currently "active".
605    /// - Returns an error if the specified value is not `None` nor "unlimited" nor a valid positive integer number.
606    ///
607    /// # Parameters
608    /// - `size`: An integer number, representing the length of the internal queuing buffers to be used in the Server. If the string "unlimited" is supplied, then no buffer size limit is requested (the check is case insensitive). It is also possible to supply a `None` value to stick to the Server default (which currently depends on the subscription mode).
609    ///
610    /// # See also
611    /// `Subscription.setRequestedMaxFrequency()`
612    pub fn set_requested_buffer_size(&mut self, size: Option<usize>) -> Result<(), String> {
613        if self.is_active {
614            return Err("Subscription is active".to_string());
615        }
616        self.requested_buffer_size = size;
617        Ok(())
618    }
619
620    /// Inquiry method that can be used to read the buffer size, configured though `setRequestedBufferSize()`, to be requested to the Server for this Subscription.
621    ///
622    /// # Lifecycle
623    /// This method can be called at any time.
624    ///
625    /// # Returns
626    /// An integer number, representing the buffer size to be requested to the server, or the string "unlimited", or `None`.
627    pub fn get_requested_buffer_size(&self) -> Option<&usize> {
628        self.requested_buffer_size.as_ref()
629    }
630
631    /// Setter method that sets the maximum update frequency to be requested to Lightstreamer Server for all the items in the Subscription. It can be used only if the Subscription mode is MERGE, DISTINCT or COMMAND (in the latter case, the frequency limitation applies to the UPDATE events for each single key). For Subscriptions with two-level behavior (see `Subscription.setCommandSecondLevelFields()` and `Subscription.setCommandSecondLevelFieldSchema()`), the specified frequency limit applies to both first-level and second-level items.
632    ///
633    /// Note that frequency limits on the items can also be set on the server side and this request can only be issued in order to furtherly reduce the frequency, not to rise it beyond these limits.
634    ///
635    /// This method can also be used to request unfiltered dispatching for the items in the Subscription. However, unfiltered dispatching requests may be refused if any frequency limit is posed on the server side for some item.
636    ///
637    /// # General Edition Note
638    /// A further global frequency limit could also be imposed by the Server, depending on Edition and License Type; this specific limit also applies to RAW mode and to unfiltered dispatching. To know what features are enabled by your license, please see the License tab of the Monitoring Dashboard (by default, available at /dashboard).
639    ///
640    /// # Default
641    /// `None`, meaning to lean on the Server default based on the subscription mode. This consists, for all modes, in not applying any frequency limit to the subscription (the same as "unlimited"); see the "General Concepts" document for further details.
642    ///
643    /// # Lifecycle
644    /// This method can can be called at any time with some differences based on the Subscription status:
645    ///
646    /// - If the Subscription instance is in its "inactive" state then this method can be called at will.
647    ///
648    /// - If the Subscription instance is in its "active" state then the method can still be called unless the current value is "unfiltered" or the supplied value is "unfiltered" or `None`. If the Subscription instance is in its "active" state and the connection to the server is currently open, then a request to change the frequency of the Subscription on the fly is sent to the server.
649    ///
650    /// # Errors
651    /// - Returns an error if the Subscription is currently "active" and the current value of this property is "unfiltered".
652    /// - Returns an error if the Subscription is currently "active" and the given parameter is `None` or "unfiltered".
653    /// - Returns an error if the specified value is not `None` nor one of the special "unlimited" and "unfiltered" values nor a valid positive number.
654    ///
655    /// # Parameters
656    /// - `freq`: A decimal number, representing the maximum update frequency (expressed in updates per second) for each item in the Subscription; for instance, with a setting of 0.5, for each single item, no more than one update every 2 seconds will be received. If the string "unlimited" is supplied, then no frequency limit is requested. It is also possible to supply the string "unfiltered", to ask for unfiltered dispatching, if it is allowed for the items, or a `None` value to stick to the Server default (which currently corresponds to "unlimited"). The check for the string constants is case insensitive.
657    pub fn set_requested_max_frequency(&mut self, freq: Option<f64>) -> Result<(), String> {
658        if self.is_active && self.requested_max_frequency.is_none() {
659            return Err("Subscription is active and current value is unfiltered".to_string());
660        }
661        if self.is_active && freq.is_none() {
662            return Err("Cannot set unfiltered while active".to_string());
663        }
664        if self.is_active && freq.is_none() {
665            return Err("Cannot set None while active".to_string());
666        }
667        self.requested_max_frequency = freq;
668        Ok(())
669    }
670
671    /// Inquiry method that can be used to read the max frequency, configured through `setRequestedMaxFrequency()`, to be requested to the Server for this Subscription.
672    ///
673    /// # Lifecycle
674    /// This method can be called at any time.
675    ///
676    /// # Returns
677    /// A decimal number, representing the max frequency to be requested to the server (expressed in updates per second), or the strings "unlimited" or "unfiltered", or `None`.
678    pub fn get_requested_max_frequency(&self) -> Option<&f64> {
679        self.requested_max_frequency.as_ref()
680    }
681
682    /// Setter method that enables/disables snapshot delivery request for the items in the Subscription. The snapshot can be requested only if the Subscription mode is MERGE, DISTINCT or COMMAND.
683    ///
684    /// # Default
685    /// "yes" if the Subscription mode is not "RAW", `None` otherwise.
686    ///
687    /// # Lifecycle
688    /// This method can only be called while the Subscription instance is in its "inactive" state.
689    ///
690    /// # Errors
691    /// - Returns an error if the Subscription is currently "active".
692    /// - Returns an error if the specified value is not "yes" nor "no" nor `None` nor a valid integer positive number.
693    /// - Returns an error if the specified value is not compatible with the mode of the Subscription:
694    ///     - In case of a RAW Subscription only `None` is a valid value;
695    ///     - In case of a non-DISTINCT Subscription only `None` "yes" and "no" are valid values.
696    ///
697    /// # Parameters
698    /// - `snapshot`: "yes"/"no" to request/not request snapshot delivery (the check is case insensitive). If the Subscription mode is DISTINCT, instead of "yes", it is also possible to supply an integer number, to specify the requested length of the snapshot (though the length of the received snapshot may be less than
699    ///   requested, because of insufficient data or server side limits); passing "yes" means that the snapshot length should be determined only by the Server. `None` is also a valid value; if specified, no snapshot preference will be sent to the server that will decide itself whether or not to send any snapshot.
700    ///
701    /// # See also
702    /// `ItemUpdate.isSnapshot()`
703    pub fn set_requested_snapshot(&mut self, snapshot: Option<Snapshot>) -> Result<(), String> {
704        if self.is_active {
705            return Err("Subscription is active".to_string());
706        }
707        match snapshot {
708            Some(Snapshot::None) if self.mode == SubscriptionMode::Raw => {
709                return Err("Cannot request snapshot for Raw mode".to_string());
710            }
711            Some(Snapshot::Number(_)) if self.mode != SubscriptionMode::Distinct => {
712                return Err("Cannot specify snapshot length for non-Distinct mode".to_string());
713            }
714            _ => {}
715        }
716        self.requested_snapshot = snapshot;
717        Ok(())
718    }
719
720    /// Inquiry method that can be used to read the snapshot preferences, configured through `setRequestedSnapshot()`, to be requested to the Server for this Subscription.
721    ///
722    /// # Lifecycle
723    /// This method can be called at any time.
724    ///
725    /// # Returns
726    /// "yes", "no", `None`, or an integer number.
727    pub fn get_requested_snapshot(&self) -> Option<&Snapshot> {
728        self.requested_snapshot.as_ref()
729    }
730
731    /// Setter method that sets the selector name for all the items in the Subscription. The selector is a filter on the updates received. It is executed on the Server and implemented by the Metadata Adapter.
732    ///
733    /// # Default
734    /// `None` (no selector).
735    ///
736    /// # Lifecycle
737    /// This method can only be called while the Subscription instance is in its "inactive" state.
738    ///
739    /// # Errors
740    /// Returns an error if the Subscription is currently "active".
741    ///
742    /// # Parameters
743    /// - `selector`: The name of a selector, to be recognized by the Metadata Adapter, or `None` to unset the selector.
744    pub fn set_selector(&mut self, selector: Option<String>) -> Result<(), String> {
745        if self.is_active {
746            return Err("Subscription is active".to_string());
747        }
748        self.selector = selector;
749        Ok(())
750    }
751
752    /// Inquiry method that can be used to read the selector name specified for this Subscription through `setSelector()`.
753    ///
754    /// # Lifecycle
755    /// This method can be called at any time.
756    ///
757    /// # Returns
758    /// The name of the selector.
759    pub fn get_selector(&self) -> Option<&String> {
760        self.selector.as_ref()
761    }
762
763    /// Returns the latest value received for the specified item/field pair.
764    ///
765    /// It is suggested to consume real-time data by implementing and adding a proper SubscriptionListener rather than probing this method. In case of COMMAND Subscriptions, the value returned by this method may be misleading, as in COMMAND mode all the keys received, being part of the same item, will overwrite each other; for COMMAND Subscriptions, use `Subscription.getCommandValue()` instead.
766    ///
767    /// Note that internal data is cleared when the Subscription is unsubscribed from.
768    ///
769    /// # Lifecycle
770    /// This method can be called at any time; if called to retrieve a value that has not been received yet, then it will return `None`.
771    ///
772    /// # Errors
773    /// Returns an error if an invalid item name or field name is specified or if the specified item position or field position is out of bounds.
774    ///
775    /// # Parameters
776    /// - `item_pos`: A String representing an item in the configured item list or a Number representing the 1-based position of the item in the specified item group. (In case an item list was specified, passing the item position is also possible).
777    /// - `field_pos`: A String representing a field in the configured field list or a Number representing the 1-based position of the field in the specified field schema. (In case a field list was specified, passing the field position is also possible).
778    ///
779    /// # Returns
780    /// The current value for the specified field of the specified item(possibly `None`), or `None` if no value has been received yet.
781    pub fn get_value(&self, item_pos: usize, field_pos: usize) -> Option<&String> {
782        self.values.get(&(item_pos, field_pos))
783    }
784
785    /// Returns the latest value received for the specified item/key/field combination in a COMMAND Subscription. This method can only be used if the Subscription mode is COMMAND. Subscriptions with two-level behavior are also supported, hence the specified field can be either a first-level or a second-level one.
786    ///
787    /// It is suggested to consume real-time data by implementing and adding a proper SubscriptionListener rather than probing this method.
788    ///
789    /// Note that internal data is cleared when the Subscription is unsubscribed from.
790    ///
791    /// # Lifecycle
792    /// This method can be called at any time; if called to retrieve a value that has not been received yet, then it will return `None`.
793    ///
794    /// # Errors
795    /// - Returns an error if an invalid item name or field name is specified or if the specified item position or field position is out of bounds.
796    /// - Returns an error if the Subscription mode is not COMMAND.
797    ///
798    /// # Parameters
799    /// - `item_pos`: A String representing an item in the configured item list or a Number representing the 1-based position of the item in the specified item group. (In case an item list was specified, passing the item position is also possible).
800    /// - `key`: A String containing the value of a key received on the COMMAND subscription.
801    /// - `field_pos`: A String representing a field in the configured field list or a Number representing the 1-based position of the field in the specified field schema. (In case a field list was specified, passing the field position is also possible).
802    ///
803    /// # Returns
804    /// The current value for the specified field of the specified key within the specified item (possibly `None`), or `None` if the specified key has not been added yet (note that it might have been added and eventually deleted).
805    pub fn get_command_value(
806        &self,
807        item_pos: usize,
808        key: &str,
809        field_pos: usize,
810    ) -> Option<&String> {
811        let key = format!("{}_{}", item_pos, key);
812        self.command_values
813            .get(&key)
814            .and_then(|fields| fields.get(&field_pos))
815    }
816
817    /// Inquiry method that checks if the Subscription is currently "active" or not. Most of the Subscription properties cannot be modified if a Subscription is "active".
818    ///
819    /// The status of a Subscription is changed to "active" through the `LightstreamerClient.subscribe()` method and back to "inactive" through the `LightstreamerClient.unsubscribe()` one.
820    ///
821    /// # Lifecycle
822    /// This method can be called at any time.
823    ///
824    /// # Returns
825    /// `true`/`false` if the Subscription is "active" or not.
826    ///
827    /// # See also
828    /// `LightstreamerClient.subscribe()`
829    ///
830    /// # See also
831    /// `LightstreamerClient.unsubscribe()`
832    pub fn is_active(&self) -> bool {
833        self.is_active
834    }
835
836    /// Inquiry method that checks if the Subscription is currently subscribed to through the server or not.
837    ///
838    /// This flag is switched to true by server sent Subscription events, and back to false in case of client disconnection, `LightstreamerClient.unsubscribe()` calls and server sent unsubscription events.
839    ///
840    /// # Lifecycle
841    /// This method can be called at any time.
842    ///
843    /// # Returns
844    /// `true`/`false` if the Subscription is subscribed to through the server or not.
845    pub fn is_subscribed(&self) -> bool {
846        self.is_subscribed
847    }
848
849    /// Returns the position of the "key" field in a COMMAND Subscription.
850    ///
851    /// This method can only be used if the Subscription mode is COMMAND and the Subscription was initialized using a "Field Schema".
852    ///
853    /// # Lifecycle
854    /// This method can be called at any time after the first `SubscriptionListener.onSubscription()` event.
855    ///
856    /// # Errors
857    /// - Returns an error if the Subscription mode is not COMMAND or if the `SubscriptionListener.onSubscription()` event for this Subscription was not yet fired.
858    /// - Returns an error if a "Field List" was specified.
859    ///
860    /// # Returns
861    /// The 1-based position of the "key" field within the "Field Schema".
862    pub fn get_key_position(&self) -> Option<usize> {
863        if self.mode != SubscriptionMode::Command || !self.is_subscribed {
864            return None;
865        }
866        if let Some(ref schema) = self.field_schema {
867            return schema.split(',').position(|field| field.trim() == "key");
868        }
869        None
870    }
871
872    /// Returns the position of the "command" field in a COMMAND Subscription.
873    ///
874    /// This method can only be used if the Subscription mode is COMMAND and the Subscription was initialized using a "Field Schema".
875    ///
876    /// # Lifecycle
877    /// This method can be called at any time after the first `SubscriptionListener.onSubscription()` event.
878    ///
879    /// # Errors
880    /// - Returns an error if the Subscription mode is not COMMAND or if the `SubscriptionListener.onSubscription()` event for this Subscription was not yet fired.
881    ///
882    /// # Returns
883    /// The 1-based position of the "command" field within the "Field Schema".
884    pub fn get_command_position(&self) -> Option<usize> {
885        if self.mode != SubscriptionMode::Command || !self.is_subscribed {
886            return None;
887        }
888        if let Some(ref schema) = self.field_schema {
889            return schema
890                .split(',')
891                .position(|field| field.trim() == "command");
892        }
893        None
894    }
895
896    /*
897    /// Handles the subscription event.
898    pub fn on_subscription(&mut self) {
899        self.is_subscribed = true;
900        for listener in &mut self.listeners {
901            listener.on_subscription();
902        }
903    }
904
905    /// Handles the unsubscription event.
906    pub fn on_unsubscription(&mut self) {
907        self.is_subscribed = false;
908        self.values.clear();
909        self.command_values.clear();
910        for listener in &mut self.listeners {
911            listener.on_unsubscription();
912        }
913    }
914
915    /// Handles an update event for a regular Subscription.
916    pub fn on_update(&mut self, item_pos: usize, field_pos: usize, value: String, is_snapshot: bool) {
917        self.values.insert((item_pos, field_pos), value.clone());
918        for listener in &mut self.listeners {
919            listener.on_update(item_pos, field_pos, &value, is_snapshot);
920        }
921    }
922
923    /// Handles an update event for a COMMAND Subscription.
924    pub fn on_command_update(&mut self, key: String, item_pos: usize, field_pos: usize, value: String, is_snapshot: bool) {
925        self.command_values
926            .entry(key.clone())
927            .or_insert_with(HashMap::new)
928            .insert(field_pos, value.clone());
929        for listener in &mut self.listeners {
930            listener.on_command_update(&key, item_pos, field_pos, &value, is_snapshot);
931        }
932    }
933    */
934}
935
936impl Debug for Subscription {
937    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
938        f.debug_struct("Subscription")
939            .field("mode", &self.mode)
940            .field("item_group", &self.item_group)
941            .field("items", &self.items)
942            .field("field_schema", &self.field_schema)
943            .field("fields", &self.fields)
944            .field("data_adapter", &self.data_adapter)
945            .field(
946                "command_second_level_data_adapter",
947                &self.command_second_level_data_adapter,
948            )
949            .field(
950                "command_second_level_field_schema",
951                &self.command_second_level_field_schema,
952            )
953            .field(
954                "command_second_level_fields",
955                &self.command_second_level_fields,
956            )
957            .field("requested_buffer_size", &self.requested_buffer_size)
958            .field("requested_max_frequency", &self.requested_max_frequency)
959            .field("requested_snapshot", &self.requested_snapshot)
960            .field("selector", &self.selector)
961            .field("is_active", &self.is_active)
962            .field("is_subscribed", &self.is_subscribed)
963            .finish()
964    }
965}
966
967#[cfg(test)]
968mod tests {
969    use super::*;
970    use crate::subscription::ItemUpdate;
971    use crate::utils::LightstreamerError;
972    use std::sync::{Arc, Mutex};
973
974    struct MockSubscriptionListener {
975        subscription_called: Arc<Mutex<bool>>,
976        unsubscription_called: Arc<Mutex<bool>>,
977        item_update_called: Arc<Mutex<bool>>,
978    }
979
980    impl MockSubscriptionListener {
981        fn new() -> Self {
982            MockSubscriptionListener {
983                subscription_called: Arc::new(Mutex::new(false)),
984                unsubscription_called: Arc::new(Mutex::new(false)),
985                item_update_called: Arc::new(Mutex::new(false)),
986            }
987        }
988    }
989
990    impl SubscriptionListener for MockSubscriptionListener {
991        fn on_subscription(&mut self) {
992            if let Ok(mut guard) = self.subscription_called.lock() {
993                *guard = true;
994            }
995        }
996
997        fn on_unsubscription(&mut self) {
998            if let Ok(mut guard) = self.unsubscription_called.lock() {
999                *guard = true;
1000            }
1001        }
1002
1003        fn on_item_update(&self, _update: &ItemUpdate) {
1004            if let Ok(mut guard) = self.item_update_called.lock() {
1005                *guard = true;
1006            }
1007        }
1008    }
1009
1010    #[test]
1011    fn test_new_subscription() -> Result<(), LightstreamerError> {
1012        let subscription = Subscription::new(
1013            SubscriptionMode::Merge,
1014            Some(vec!["item1".to_string(), "item2".to_string()]),
1015            Some(vec!["field1".to_string(), "field2".to_string()]),
1016        )?;
1017
1018        assert_eq!(*subscription.get_mode(), SubscriptionMode::Merge);
1019        if let Some(items) = subscription.get_items() {
1020            assert_eq!(items, &vec!["item1".to_string(), "item2".to_string()]);
1021        }
1022        if let Some(fields) = subscription.get_fields() {
1023            assert_eq!(fields, &vec!["field1".to_string(), "field2".to_string()]);
1024        }
1025
1026        let subscription = Subscription::new(
1027            SubscriptionMode::Merge,
1028            None,
1029            Some(vec!["field1".to_string()]),
1030        );
1031        assert!(subscription.is_err());
1032
1033        let subscription = Subscription::new(
1034            SubscriptionMode::Merge,
1035            Some(vec!["item1".to_string()]),
1036            None,
1037        );
1038        assert!(subscription.is_err());
1039        Ok(())
1040    }
1041
1042    #[test]
1043    fn test_add_and_remove_listener() -> Result<(), LightstreamerError> {
1044        let mut subscription = Subscription::new(
1045            SubscriptionMode::Merge,
1046            Some(vec!["item1".to_string()]),
1047            Some(vec!["field1".to_string()]),
1048        )?;
1049        assert_eq!(subscription.get_listeners().len(), 0);
1050
1051        let listener = Box::new(MockSubscriptionListener::new());
1052        subscription.add_listener(listener);
1053        assert_eq!(subscription.get_listeners().len(), 1);
1054
1055        let listener2 = MockSubscriptionListener::new();
1056        subscription.remove_listener(&listener2);
1057        assert_eq!(subscription.get_listeners().len(), 1);
1058
1059        subscription.add_listener(Box::new(listener2));
1060        assert_eq!(subscription.get_listeners().len(), 2);
1061        Ok(())
1062    }
1063
1064    #[test]
1065    fn test_set_items() -> Result<(), LightstreamerError> {
1066        let mut subscription = Subscription::new(
1067            SubscriptionMode::Merge,
1068            Some(vec!["item1".to_string()]),
1069            Some(vec!["field1".to_string()]),
1070        )?;
1071
1072        let result = subscription.set_items(vec!["new_item1".to_string(), "new_item2".to_string()]);
1073        assert!(result.is_ok());
1074
1075        if let Some(items) = subscription.get_items() {
1076            assert_eq!(
1077                items,
1078                &vec!["new_item1".to_string(), "new_item2".to_string()]
1079            );
1080        }
1081
1082        subscription.is_active = true;
1083
1084        let result = subscription.set_items(vec!["another_item".to_string()]);
1085        assert!(result.is_err());
1086        Ok(())
1087    }
1088
1089    #[test]
1090    fn test_set_fields() -> Result<(), LightstreamerError> {
1091        let mut subscription = Subscription::new(
1092            SubscriptionMode::Merge,
1093            Some(vec!["item1".to_string()]),
1094            Some(vec!["field1".to_string()]),
1095        )?;
1096
1097        let result =
1098            subscription.set_fields(vec!["new_field1".to_string(), "new_field2".to_string()]);
1099        assert!(result.is_ok());
1100
1101        if let Some(fields) = subscription.get_fields() {
1102            assert_eq!(
1103                fields,
1104                &vec!["new_field1".to_string(), "new_field2".to_string()]
1105            );
1106        }
1107
1108        subscription.is_active = true;
1109
1110        let result = subscription.set_fields(vec!["another_field".to_string()]);
1111        assert!(result.is_err());
1112        Ok(())
1113    }
1114
1115    #[test]
1116    fn test_set_item_group() -> Result<(), LightstreamerError> {
1117        let mut subscription = Subscription::new(
1118            SubscriptionMode::Merge,
1119            Some(vec!["item1".to_string()]),
1120            Some(vec!["field1".to_string()]),
1121        )?;
1122
1123        let result = subscription.set_item_group("group1".to_string());
1124        assert!(result.is_ok());
1125
1126        if let Some(group) = subscription.get_item_group() {
1127            assert_eq!(group, "group1");
1128        }
1129
1130        subscription.is_active = true;
1131
1132        let result = subscription.set_item_group("another_group".to_string());
1133        assert!(result.is_err());
1134        Ok(())
1135    }
1136
1137    #[test]
1138    fn test_set_field_schema() -> Result<(), LightstreamerError> {
1139        let mut subscription = Subscription::new(
1140            SubscriptionMode::Merge,
1141            Some(vec!["item1".to_string()]),
1142            Some(vec!["field1".to_string()]),
1143        )?;
1144
1145        let result = subscription.set_field_schema("schema1".to_string());
1146        assert!(result.is_ok());
1147
1148        if let Some(schema) = subscription.get_field_schema() {
1149            assert_eq!(schema, "schema1");
1150        }
1151
1152        subscription.is_active = true;
1153
1154        let result = subscription.set_field_schema("another_schema".to_string());
1155        assert!(result.is_err());
1156        Ok(())
1157    }
1158
1159    #[test]
1160    fn test_set_data_adapter() -> Result<(), LightstreamerError> {
1161        let mut subscription = Subscription::new(
1162            SubscriptionMode::Merge,
1163            Some(vec!["item1".to_string()]),
1164            Some(vec!["field1".to_string()]),
1165        )?;
1166
1167        let result = subscription.set_data_adapter(Some("adapter1".to_string()));
1168        assert!(result.is_ok());
1169        if let Some(adapter) = subscription.get_data_adapter() {
1170            assert_eq!(adapter, "adapter1");
1171        }
1172
1173        subscription.is_active = true;
1174
1175        let result = subscription.set_data_adapter(Some("another_adapter".to_string()));
1176        assert!(result.is_err());
1177        Ok(())
1178    }
1179
1180    #[test]
1181    fn test_set_requested_snapshot() -> Result<(), LightstreamerError> {
1182        let mut subscription = Subscription::new(
1183            SubscriptionMode::Merge,
1184            Some(vec!["item1".to_string()]),
1185            Some(vec!["field1".to_string()]),
1186        )?;
1187
1188        let result = subscription.set_requested_snapshot(Some(Snapshot::Yes));
1189        assert!(result.is_ok());
1190
1191        if let Some(snapshot) = subscription.get_requested_snapshot() {
1192            match snapshot {
1193                Snapshot::Yes => {} // OK
1194                _ => panic!("Expected Snapshot::Yes"),
1195            }
1196        }
1197
1198        subscription.is_active = true;
1199
1200        let result = subscription.set_requested_snapshot(Some(Snapshot::No));
1201        assert!(result.is_err());
1202        Ok(())
1203    }
1204
1205    #[test]
1206    fn test_set_requested_buffer_size() -> Result<(), LightstreamerError> {
1207        let mut subscription = Subscription::new(
1208            SubscriptionMode::Merge,
1209            Some(vec!["item1".to_string()]),
1210            Some(vec!["field1".to_string()]),
1211        )?;
1212
1213        let result = subscription.set_requested_buffer_size(Some(10));
1214        assert!(result.is_ok());
1215
1216        if let Some(size) = subscription.get_requested_buffer_size() {
1217            assert_eq!(size, &10);
1218        }
1219
1220        subscription.is_active = true;
1221
1222        let result = subscription.set_requested_buffer_size(Some(20));
1223        assert!(result.is_err());
1224        Ok(())
1225    }
1226
1227    #[test]
1228    fn test_set_requested_max_frequency() -> Result<(), LightstreamerError> {
1229        let mut subscription = Subscription::new(
1230            SubscriptionMode::Merge,
1231            Some(vec!["item1".to_string()]),
1232            Some(vec!["field1".to_string()]),
1233        )?;
1234
1235        let result = subscription.set_requested_max_frequency(Some(10.5));
1236        assert!(result.is_ok());
1237
1238        if let Some(freq) = subscription.get_requested_max_frequency() {
1239            assert_eq!(freq, &10.5);
1240        }
1241
1242        subscription.is_active = true;
1243
1244        let result = subscription.set_requested_max_frequency(Some(20.5));
1245        assert!(result.is_ok());
1246
1247        subscription.requested_max_frequency = None;
1248        let result = subscription.set_requested_max_frequency(Some(30.5));
1249        assert!(result.is_err());
1250        Ok(())
1251    }
1252
1253    #[test]
1254    fn test_set_selector() -> Result<(), LightstreamerError> {
1255        let mut subscription = Subscription::new(
1256            SubscriptionMode::Merge,
1257            Some(vec!["item1".to_string()]),
1258            Some(vec!["field1".to_string()]),
1259        )?;
1260
1261        let result = subscription.set_selector(Some("selector1".to_string()));
1262        assert!(result.is_ok());
1263
1264        if let Some(selector) = subscription.get_selector() {
1265            assert_eq!(selector, "selector1");
1266        }
1267
1268        subscription.is_active = true;
1269
1270        let result = subscription.set_selector(Some("another_selector".to_string()));
1271        assert!(result.is_err());
1272        Ok(())
1273    }
1274
1275    #[test]
1276    fn test_command_second_level_methods() -> Result<(), LightstreamerError> {
1277        let mut subscription = Subscription::new(
1278            SubscriptionMode::Command,
1279            Some(vec!["item1".to_string()]),
1280            Some(vec!["field1".to_string()]),
1281        )?;
1282
1283        let result =
1284            subscription.set_command_second_level_data_adapter(Some("adapter1".to_string()));
1285        assert!(result.is_ok());
1286        if let Some(adapter) = subscription.get_command_second_level_data_adapter() {
1287            assert_eq!(adapter, "adapter1");
1288        }
1289
1290        subscription.is_active = true;
1291
1292        let result =
1293            subscription.set_command_second_level_data_adapter(Some("adapter2".to_string()));
1294        assert!(result.is_err());
1295
1296        subscription.is_active = false;
1297
1298        let result = subscription.set_command_second_level_fields(Some(vec![
1299            "field1".to_string(),
1300            "field2".to_string(),
1301        ]));
1302        assert!(result.is_ok());
1303        if let Some(fields) = subscription.get_command_second_level_fields() {
1304            assert_eq!(fields, &vec!["field1".to_string(), "field2".to_string()]);
1305        }
1306
1307        let result =
1308            subscription.set_command_second_level_field_schema(Some("schema1".to_string()));
1309        assert!(result.is_ok());
1310        if let Some(schema) = subscription.get_command_second_level_field_schema() {
1311            assert_eq!(schema, "schema1");
1312        }
1313
1314        let mut non_command_subscription = Subscription::new(
1315            SubscriptionMode::Merge,
1316            Some(vec!["item1".to_string()]),
1317            Some(vec!["field1".to_string()]),
1318        )?;
1319
1320        let result = non_command_subscription
1321            .set_command_second_level_data_adapter(Some("adapter1".to_string()));
1322        assert!(result.is_err());
1323
1324        let result = non_command_subscription
1325            .set_command_second_level_fields(Some(vec!["field1".to_string()]));
1326        assert!(result.is_err());
1327
1328        let result = non_command_subscription
1329            .set_command_second_level_field_schema(Some("schema1".to_string()));
1330        assert!(result.is_err());
1331        Ok(())
1332    }
1333
1334    #[test]
1335    fn test_is_active_and_is_subscribed() -> Result<(), LightstreamerError> {
1336        let subscription = Subscription::new(
1337            SubscriptionMode::Merge,
1338            Some(vec!["item1".to_string()]),
1339            Some(vec!["field1".to_string()]),
1340        )?;
1341
1342        assert!(!subscription.is_active());
1343        assert!(!subscription.is_subscribed());
1344        Ok(())
1345    }
1346
1347    #[test]
1348    fn test_get_key_position() -> Result<(), LightstreamerError> {
1349        // Create a COMMAND subscription with field_schema containing key
1350        let mut subscription = Subscription::new(
1351            SubscriptionMode::Command,
1352            Some(vec!["item1".to_string()]),
1353            Some(vec![
1354                "key".to_string(),
1355                "command".to_string(),
1356                "field1".to_string(),
1357            ]),
1358        )?;
1359
1360        // Set field_schema with key field
1361        subscription.field_schema = Some("key,command,field1".to_string());
1362
1363        // Not subscribed yet, should return None
1364        assert_eq!(subscription.get_key_position(), None);
1365
1366        // Mark as subscribed
1367        subscription.is_subscribed = true;
1368
1369        // Now it should return the position of key (0)
1370        assert_eq!(subscription.get_key_position(), Some(0));
1371
1372        // Test with a non-COMMAND subscription
1373        let mut non_command_subscription = Subscription::new(
1374            SubscriptionMode::Merge,
1375            Some(vec!["item1".to_string()]),
1376            Some(vec!["key".to_string(), "field1".to_string()]),
1377        )?;
1378
1379        non_command_subscription.field_schema = Some("key,field1".to_string());
1380        non_command_subscription.is_subscribed = true;
1381
1382        // Should return None for non-COMMAND subscription
1383        assert_eq!(non_command_subscription.get_key_position(), None);
1384
1385        // Test with COMMAND subscription but no key field
1386        let mut no_key_subscription = Subscription::new(
1387            SubscriptionMode::Command,
1388            Some(vec!["item1".to_string()]),
1389            Some(vec!["command".to_string(), "field1".to_string()]),
1390        )?;
1391
1392        no_key_subscription.field_schema = Some("command,field1".to_string());
1393        no_key_subscription.is_subscribed = true;
1394
1395        // Should return None when key field is not present
1396        assert_eq!(no_key_subscription.get_key_position(), None);
1397        Ok(())
1398    }
1399
1400    #[test]
1401    fn test_get_command_position() -> Result<(), LightstreamerError> {
1402        // Create a COMMAND subscription with field_schema containing command
1403        let mut subscription = Subscription::new(
1404            SubscriptionMode::Command,
1405            Some(vec!["item1".to_string()]),
1406            Some(vec![
1407                "key".to_string(),
1408                "command".to_string(),
1409                "field1".to_string(),
1410            ]),
1411        )?;
1412
1413        // Set field_schema with command field
1414        subscription.field_schema = Some("key,command,field1".to_string());
1415
1416        // Not subscribed yet, should return None
1417        assert_eq!(subscription.get_command_position(), None);
1418
1419        // Mark as subscribed
1420        subscription.is_subscribed = true;
1421
1422        // Now it should return the position of command (1)
1423        assert_eq!(subscription.get_command_position(), Some(1));
1424
1425        // Test with a non-COMMAND subscription
1426        let mut non_command_subscription = Subscription::new(
1427            SubscriptionMode::Merge,
1428            Some(vec!["item1".to_string()]),
1429            Some(vec!["command".to_string(), "field1".to_string()]),
1430        )?;
1431
1432        non_command_subscription.field_schema = Some("command,field1".to_string());
1433        non_command_subscription.is_subscribed = true;
1434
1435        // Should return None for non-COMMAND subscription
1436        assert_eq!(non_command_subscription.get_command_position(), None);
1437
1438        // Test with COMMAND subscription but no command field
1439        let mut no_command_subscription = Subscription::new(
1440            SubscriptionMode::Command,
1441            Some(vec!["item1".to_string()]),
1442            Some(vec!["key".to_string(), "field1".to_string()]),
1443        )?;
1444
1445        no_command_subscription.field_schema = Some("key,field1".to_string());
1446        no_command_subscription.is_subscribed = true;
1447
1448        // Should return None when command field is not present
1449        assert_eq!(no_command_subscription.get_command_position(), None);
1450        Ok(())
1451    }
1452
1453    #[test]
1454    fn test_debug_implementation() -> Result<(), LightstreamerError> {
1455        let subscription = Subscription::new(
1456            SubscriptionMode::Merge,
1457            Some(vec!["item1".to_string()]),
1458            Some(vec!["field1".to_string()]),
1459        )?;
1460
1461        // Test that Debug implementation works without panicking
1462        let debug_string = format!("{:?}", subscription);
1463
1464        // Verify it contains some expected fields
1465        assert!(debug_string.contains("mode"));
1466        assert!(debug_string.contains("items"));
1467        assert!(debug_string.contains("fields"));
1468        assert!(debug_string.contains("is_active"));
1469        assert!(debug_string.contains("is_subscribed"));
1470        Ok(())
1471    }
1472
1473    #[test]
1474    fn test_snapshot_display() {
1475        // Test the Display implementation for Snapshot
1476        assert_eq!(format!("{}", Snapshot::Yes), "true");
1477        assert_eq!(format!("{}", Snapshot::No), "false");
1478        assert_eq!(format!("{}", Snapshot::Number(5)), "5");
1479        assert_eq!(format!("{}", Snapshot::None), "");
1480    }
1481
1482    #[test]
1483    fn test_subscription_mode_display() {
1484        // Test the Display implementation for SubscriptionMode
1485        assert_eq!(format!("{}", SubscriptionMode::Merge), "MERGE");
1486        assert_eq!(format!("{}", SubscriptionMode::Distinct), "DISTINCT");
1487        assert_eq!(format!("{}", SubscriptionMode::Command), "COMMAND");
1488        assert_eq!(format!("{}", SubscriptionMode::Raw), "RAW");
1489    }
1490
1491    #[test]
1492    fn test_snapshot_default() {
1493        // Test the Default implementation for &Snapshot
1494        let default_snapshot: &Snapshot = Default::default();
1495        assert!(matches!(default_snapshot, &Snapshot::None));
1496    }
1497
1498    #[test]
1499    fn test_get_command_value() -> Result<(), LightstreamerError> {
1500        let mut subscription = Subscription::new(
1501            SubscriptionMode::Command,
1502            Some(vec!["item1".to_string()]),
1503            Some(vec![
1504                "key".to_string(),
1505                "command".to_string(),
1506                "field1".to_string(),
1507            ]),
1508        )?;
1509
1510        // Manually add some command values to test get_command_value
1511        let mut field_map = HashMap::new();
1512        field_map.insert(3, "test_value".to_string());
1513        subscription
1514            .command_values
1515            .insert("1_test_key".to_string(), field_map);
1516
1517        // Test getting an existing value
1518        let value = subscription.get_command_value(1, "test_key", 3);
1519        assert_eq!(value, Some(&"test_value".to_string()));
1520
1521        // Test getting a non-existent key
1522        let value = subscription.get_command_value(1, "non_existent_key", 3);
1523        assert_eq!(value, None);
1524
1525        // Test getting a non-existent field position
1526        let value = subscription.get_command_value(1, "test_key", 4);
1527        assert_eq!(value, None);
1528
1529        // Test getting a non-existent item position
1530        let value = subscription.get_command_value(2, "test_key", 3);
1531        assert_eq!(value, None);
1532        Ok(())
1533    }
1534
1535    #[test]
1536    fn test_set_requested_buffer_size_with_unlimited() -> Result<(), LightstreamerError> {
1537        let mut subscription = Subscription::new(
1538            SubscriptionMode::Merge,
1539            Some(vec!["item1".to_string()]),
1540            Some(vec!["field1".to_string()]),
1541        )?;
1542
1543        // Test setting buffer size to "unlimited"
1544        let result = subscription.set_requested_buffer_size(None);
1545        assert!(result.is_ok());
1546        assert_eq!(subscription.get_requested_buffer_size(), None);
1547        Ok(())
1548    }
1549
1550    #[test]
1551    fn test_command_second_level_field_methods_with_invalid_inputs()
1552    -> Result<(), LightstreamerError> {
1553        // Test with non-COMMAND subscription
1554        let mut non_command_subscription = Subscription::new(
1555            SubscriptionMode::Merge,
1556            Some(vec!["item1".to_string()]),
1557            Some(vec!["field1".to_string()]),
1558        )?;
1559
1560        // Test set_command_second_level_fields with invalid subscription mode
1561        let result = non_command_subscription
1562            .set_command_second_level_fields(Some(vec!["field1".to_string()]));
1563        assert!(result.is_err());
1564        assert_eq!(result.unwrap_err(), "Subscription mode is not Command");
1565
1566        // Test set_command_second_level_field_schema with invalid subscription mode
1567        let result = non_command_subscription
1568            .set_command_second_level_field_schema(Some("field1".to_string()));
1569        assert!(result.is_err());
1570        assert_eq!(result.unwrap_err(), "Subscription mode is not Command");
1571
1572        // Test with COMMAND subscription but active
1573        let mut command_subscription = Subscription::new(
1574            SubscriptionMode::Command,
1575            Some(vec!["item1".to_string()]),
1576            Some(vec!["field1".to_string()]),
1577        )?;
1578
1579        // Make the subscription active
1580        command_subscription.is_active = true;
1581
1582        // Test set_command_second_level_fields with active subscription
1583        let result =
1584            command_subscription.set_command_second_level_fields(Some(vec!["field1".to_string()]));
1585        assert!(result.is_err());
1586        assert_eq!(result.unwrap_err(), "Subscription is active");
1587
1588        // Test set_command_second_level_field_schema with active subscription
1589        let result =
1590            command_subscription.set_command_second_level_field_schema(Some("field1".to_string()));
1591        assert!(result.is_err());
1592        if let Err(e) = result {
1593            assert_eq!(e, "Subscription is active");
1594        }
1595        Ok(())
1596    }
1597
1598    #[test]
1599    fn test_set_data_adapter_with_invalid_inputs() -> Result<(), LightstreamerError> {
1600        let mut subscription = Subscription::new(
1601            SubscriptionMode::Merge,
1602            Some(vec!["item1".to_string()]),
1603            Some(vec!["field1".to_string()]),
1604        )?;
1605
1606        // Make the subscription active
1607        subscription.is_active = true;
1608
1609        // Test set_data_adapter with active subscription
1610        let result = subscription.set_data_adapter(Some("adapter1".to_string()));
1611        assert!(result.is_err());
1612        if let Err(e) = result {
1613            assert_eq!(e, "Subscription is active");
1614        }
1615        Ok(())
1616    }
1617
1618    #[test]
1619    fn test_set_selector_with_invalid_inputs() -> Result<(), LightstreamerError> {
1620        let mut subscription = Subscription::new(
1621            SubscriptionMode::Merge,
1622            Some(vec!["item1".to_string()]),
1623            Some(vec!["field1".to_string()]),
1624        )?;
1625
1626        // Make the subscription active
1627        subscription.is_active = true;
1628
1629        // Test set_selector with active subscription
1630        let result = subscription.set_selector(Some("selector1".to_string()));
1631        assert!(result.is_err());
1632        if let Err(e) = result {
1633            assert_eq!(e, "Subscription is active");
1634        }
1635        Ok(())
1636    }
1637}