Skip to main content

mq_rest_admin/
sync_ops.rs

1//! Synchronous start/stop/restart wrappers for MQ objects.
2
3use std::collections::HashMap;
4use std::thread;
5use std::time::{Duration, Instant};
6
7use serde_json::Value;
8
9use crate::error::{MqRestError, Result};
10use crate::session::MqRestSession;
11
12/// Configuration for synchronous polling operations.
13#[derive(Debug, Clone, Copy)]
14pub struct SyncConfig {
15    /// Maximum wall-clock seconds to wait for the target state.
16    pub timeout_seconds: f64,
17    /// Seconds to sleep between status polls.
18    pub poll_interval_seconds: f64,
19}
20
21impl SyncConfig {
22    /// Create a new `SyncConfig` with validated parameters.
23    ///
24    /// # Errors
25    ///
26    /// Returns [`MqRestError::InvalidConfig`] if either value is not positive.
27    pub fn new(timeout_seconds: f64, poll_interval_seconds: f64) -> Result<Self> {
28        check_positive("timeout_seconds", timeout_seconds)?;
29        check_positive("poll_interval_seconds", poll_interval_seconds)?;
30        Ok(Self {
31            timeout_seconds,
32            poll_interval_seconds,
33        })
34    }
35}
36
37impl Default for SyncConfig {
38    fn default() -> Self {
39        Self {
40            timeout_seconds: 30.0,
41            poll_interval_seconds: 1.0,
42        }
43    }
44}
45
46/// Operation performed by a synchronous wrapper.
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum SyncOperation {
49    /// The object was started and confirmed running.
50    Started,
51    /// The object was stopped and confirmed stopped.
52    Stopped,
53    /// The object was stopped then started.
54    Restarted,
55}
56
57/// Result of a synchronous start/stop/restart operation.
58#[derive(Debug, Clone)]
59pub struct SyncResult {
60    /// The operation that was performed.
61    pub operation: SyncOperation,
62    /// Total number of status polls issued.
63    pub polls: u32,
64    /// Total wall-clock seconds from command to target state confirmation.
65    pub elapsed_seconds: f64,
66}
67
68struct ObjectTypeConfig {
69    start_qualifier: &'static str,
70    stop_qualifier: &'static str,
71    status_qualifier: &'static str,
72    status_keys: &'static [&'static str],
73    empty_means_stopped: bool,
74}
75
76const CHANNEL_CONFIG: ObjectTypeConfig = ObjectTypeConfig {
77    start_qualifier: "CHANNEL",
78    stop_qualifier: "CHANNEL",
79    status_qualifier: "CHSTATUS",
80    status_keys: &["channel_status", "STATUS"],
81    empty_means_stopped: true,
82};
83
84const LISTENER_CONFIG: ObjectTypeConfig = ObjectTypeConfig {
85    start_qualifier: "LISTENER",
86    stop_qualifier: "LISTENER",
87    status_qualifier: "LSSTATUS",
88    status_keys: &["status", "STATUS"],
89    empty_means_stopped: false,
90};
91
92const SERVICE_CONFIG: ObjectTypeConfig = ObjectTypeConfig {
93    start_qualifier: "SERVICE",
94    stop_qualifier: "SERVICE",
95    status_qualifier: "SVSTATUS",
96    status_keys: &["status", "STATUS"],
97    empty_means_stopped: false,
98};
99
100const RUNNING_VALUES: &[&str] = &["RUNNING", "running"];
101const STOPPED_VALUES: &[&str] = &["STOPPED", "stopped"];
102
103impl MqRestSession {
104    // ---- Channel ----
105
106    /// Start a channel and wait until it is running.
107    ///
108    /// # Errors
109    ///
110    /// Returns an error if the START command fails or the channel does not
111    /// reach RUNNING within the timeout.
112    pub fn start_channel_sync(
113        &mut self,
114        name: &str,
115        config: Option<SyncConfig>,
116    ) -> Result<SyncResult> {
117        start_and_poll(self, name, &CHANNEL_CONFIG, config)
118    }
119
120    /// Stop a channel and wait until it is stopped.
121    ///
122    /// # Errors
123    ///
124    /// Returns an error if the STOP command fails or the channel does not
125    /// reach STOPPED within the timeout.
126    pub fn stop_channel_sync(
127        &mut self,
128        name: &str,
129        config: Option<SyncConfig>,
130    ) -> Result<SyncResult> {
131        stop_and_poll(self, name, &CHANNEL_CONFIG, config)
132    }
133
134    /// Stop then start a channel, waiting for each phase.
135    ///
136    /// # Errors
137    ///
138    /// Returns an error if either the stop or start phase fails or times out.
139    pub fn restart_channel(
140        &mut self,
141        name: &str,
142        config: Option<SyncConfig>,
143    ) -> Result<SyncResult> {
144        restart(self, name, &CHANNEL_CONFIG, config)
145    }
146
147    // ---- Listener ----
148
149    /// Start a listener and wait until it is running.
150    ///
151    /// # Errors
152    ///
153    /// Returns an error if the START command fails or the listener does not
154    /// reach RUNNING within the timeout.
155    pub fn start_listener_sync(
156        &mut self,
157        name: &str,
158        config: Option<SyncConfig>,
159    ) -> Result<SyncResult> {
160        start_and_poll(self, name, &LISTENER_CONFIG, config)
161    }
162
163    /// Stop a listener and wait until it is stopped.
164    ///
165    /// # Errors
166    ///
167    /// Returns an error if the STOP command fails or the listener does not
168    /// reach STOPPED within the timeout.
169    pub fn stop_listener_sync(
170        &mut self,
171        name: &str,
172        config: Option<SyncConfig>,
173    ) -> Result<SyncResult> {
174        stop_and_poll(self, name, &LISTENER_CONFIG, config)
175    }
176
177    /// Stop then start a listener, waiting for each phase.
178    ///
179    /// # Errors
180    ///
181    /// Returns an error if either the stop or start phase fails or times out.
182    pub fn restart_listener(
183        &mut self,
184        name: &str,
185        config: Option<SyncConfig>,
186    ) -> Result<SyncResult> {
187        restart(self, name, &LISTENER_CONFIG, config)
188    }
189
190    // ---- Service ----
191
192    /// Start a service and wait until it is running.
193    ///
194    /// # Errors
195    ///
196    /// Returns an error if the START command fails or the service does not
197    /// reach RUNNING within the timeout.
198    pub fn start_service_sync(
199        &mut self,
200        name: &str,
201        config: Option<SyncConfig>,
202    ) -> Result<SyncResult> {
203        start_and_poll(self, name, &SERVICE_CONFIG, config)
204    }
205
206    /// Stop a service and wait until it is stopped.
207    ///
208    /// # Errors
209    ///
210    /// Returns an error if the STOP command fails or the service does not
211    /// reach STOPPED within the timeout.
212    pub fn stop_service_sync(
213        &mut self,
214        name: &str,
215        config: Option<SyncConfig>,
216    ) -> Result<SyncResult> {
217        stop_and_poll(self, name, &SERVICE_CONFIG, config)
218    }
219
220    /// Stop then start a service, waiting for each phase.
221    ///
222    /// # Errors
223    ///
224    /// Returns an error if either the stop or start phase fails or times out.
225    pub fn restart_service(
226        &mut self,
227        name: &str,
228        config: Option<SyncConfig>,
229    ) -> Result<SyncResult> {
230        restart(self, name, &SERVICE_CONFIG, config)
231    }
232}
233
234fn start_and_poll(
235    session: &mut MqRestSession,
236    name: &str,
237    obj_config: &ObjectTypeConfig,
238    config: Option<SyncConfig>,
239) -> Result<SyncResult> {
240    let sync_config = config.unwrap_or_default();
241    session.mqsc_command(
242        "START",
243        obj_config.start_qualifier,
244        Some(name),
245        None,
246        None,
247        None,
248    )?;
249    let mut polls = 0u32;
250    let start_time = Instant::now();
251    loop {
252        thread::sleep(Duration::from_secs_f64(sync_config.poll_interval_seconds));
253        let all_params: &[&str] = &["all"];
254        let status_rows = session.mqsc_command(
255            "DISPLAY",
256            obj_config.status_qualifier,
257            Some(name),
258            None,
259            Some(all_params),
260            None,
261        )?;
262        polls += 1;
263        if has_status(&status_rows, obj_config.status_keys, RUNNING_VALUES) {
264            let elapsed = start_time.elapsed().as_secs_f64();
265            return Ok(SyncResult {
266                operation: SyncOperation::Started,
267                polls,
268                elapsed_seconds: elapsed,
269            });
270        }
271        let elapsed = start_time.elapsed().as_secs_f64();
272        if elapsed >= sync_config.timeout_seconds {
273            return Err(MqRestError::Timeout {
274                name: name.into(),
275                operation: "start".into(),
276                elapsed,
277                message: format!(
278                    "{} '{}' did not reach RUNNING within {}s",
279                    obj_config.start_qualifier, name, sync_config.timeout_seconds
280                ),
281            });
282        }
283    }
284}
285
286fn stop_and_poll(
287    session: &mut MqRestSession,
288    name: &str,
289    obj_config: &ObjectTypeConfig,
290    config: Option<SyncConfig>,
291) -> Result<SyncResult> {
292    let sync_config = config.unwrap_or_default();
293    session.mqsc_command(
294        "STOP",
295        obj_config.stop_qualifier,
296        Some(name),
297        None,
298        None,
299        None,
300    )?;
301    let mut polls = 0u32;
302    let start_time = Instant::now();
303    loop {
304        thread::sleep(Duration::from_secs_f64(sync_config.poll_interval_seconds));
305        let all_params: &[&str] = &["all"];
306        let status_rows = session.mqsc_command(
307            "DISPLAY",
308            obj_config.status_qualifier,
309            Some(name),
310            None,
311            Some(all_params),
312            None,
313        )?;
314        polls += 1;
315        if obj_config.empty_means_stopped && status_rows.is_empty() {
316            let elapsed = start_time.elapsed().as_secs_f64();
317            return Ok(SyncResult {
318                operation: SyncOperation::Stopped,
319                polls,
320                elapsed_seconds: elapsed,
321            });
322        }
323        if has_status(&status_rows, obj_config.status_keys, STOPPED_VALUES) {
324            let elapsed = start_time.elapsed().as_secs_f64();
325            return Ok(SyncResult {
326                operation: SyncOperation::Stopped,
327                polls,
328                elapsed_seconds: elapsed,
329            });
330        }
331        let elapsed = start_time.elapsed().as_secs_f64();
332        if elapsed >= sync_config.timeout_seconds {
333            return Err(MqRestError::Timeout {
334                name: name.into(),
335                operation: "stop".into(),
336                elapsed,
337                message: format!(
338                    "{} '{}' did not reach STOPPED within {}s",
339                    obj_config.stop_qualifier, name, sync_config.timeout_seconds
340                ),
341            });
342        }
343    }
344}
345
346fn restart(
347    session: &mut MqRestSession,
348    name: &str,
349    obj_config: &ObjectTypeConfig,
350    config: Option<SyncConfig>,
351) -> Result<SyncResult> {
352    let stop_result = stop_and_poll(session, name, obj_config, config)?;
353    let start_result = start_and_poll(session, name, obj_config, config)?;
354    Ok(SyncResult {
355        operation: SyncOperation::Restarted,
356        polls: stop_result.polls + start_result.polls,
357        elapsed_seconds: stop_result.elapsed_seconds + start_result.elapsed_seconds,
358    })
359}
360
361fn check_positive(field: &str, value: f64) -> Result<()> {
362    if value > 0.0 {
363        return Ok(());
364    }
365    Err(MqRestError::InvalidConfig {
366        message: format!("{field} must be positive, got {value}"),
367    })
368}
369
370fn has_status(
371    rows: &[HashMap<String, Value>],
372    status_keys: &[&str],
373    target_values: &[&str],
374) -> bool {
375    for row in rows {
376        for key in status_keys {
377            if let Some(Value::String(value)) = row.get(*key)
378                && target_values.contains(&value.as_str())
379            {
380                return true;
381            }
382        }
383    }
384    false
385}
386
387#[cfg(test)]
388mod tests {
389    use super::*;
390    use crate::test_helpers::{
391        MockTransport, empty_success_response, mock_session, success_response,
392    };
393    use serde_json::json;
394
395    fn fast_config() -> SyncConfig {
396        SyncConfig {
397            timeout_seconds: 0.5,
398            poll_interval_seconds: 0.01,
399        }
400    }
401
402    fn status_response(key: &str, value: &str) -> crate::transport::TransportResponse {
403        let mut params = HashMap::new();
404        params.insert(key.into(), json!(value));
405        success_response(vec![params])
406    }
407
408    // ---- SyncConfig::default ----
409
410    #[test]
411    fn sync_config_default_values() {
412        let config = SyncConfig::default();
413        assert!((config.timeout_seconds - 30.0).abs() < f64::EPSILON);
414        assert!((config.poll_interval_seconds - 1.0).abs() < f64::EPSILON);
415    }
416
417    // ---- has_status ----
418
419    #[test]
420    fn has_status_match_first_key() {
421        let mut row = HashMap::new();
422        row.insert("channel_status".into(), json!("RUNNING"));
423        assert!(has_status(
424            &[row],
425            &["channel_status", "STATUS"],
426            &["RUNNING"]
427        ));
428    }
429
430    #[test]
431    fn has_status_match_second_key() {
432        let mut row = HashMap::new();
433        row.insert("STATUS".into(), json!("STOPPED"));
434        assert!(has_status(
435            &[row],
436            &["channel_status", "STATUS"],
437            &["STOPPED"]
438        ));
439    }
440
441    #[test]
442    fn has_status_no_match() {
443        let mut row = HashMap::new();
444        row.insert("STATUS".into(), json!("STARTING"));
445        assert!(!has_status(
446            &[row],
447            &["channel_status", "STATUS"],
448            &["RUNNING"]
449        ));
450    }
451
452    #[test]
453    fn has_status_empty_rows() {
454        assert!(!has_status(&[], &["STATUS"], &["RUNNING"]));
455    }
456
457    #[test]
458    fn has_status_non_string_value() {
459        let mut row = HashMap::new();
460        row.insert("STATUS".into(), json!(42));
461        assert!(!has_status(&[row], &["STATUS"], &["RUNNING"]));
462    }
463
464    // ---- start_channel_sync ----
465
466    #[test]
467    fn start_channel_sync_first_poll_running() {
468        let transport = MockTransport::new(vec![
469            empty_success_response(),
470            status_response("channel_status", "RUNNING"),
471        ]);
472        let mut session = mock_session(transport);
473        let result = session
474            .start_channel_sync("MY.CH", Some(fast_config()))
475            .unwrap();
476        assert_eq!(result.operation, SyncOperation::Started);
477        assert!(result.polls >= 1);
478    }
479
480    #[test]
481    fn start_channel_sync_timeout() {
482        let transport = MockTransport::new(vec![
483            empty_success_response(),
484            status_response("channel_status", "STARTING"),
485            status_response("channel_status", "STARTING"),
486            status_response("channel_status", "STARTING"),
487            status_response("channel_status", "STARTING"),
488            status_response("channel_status", "STARTING"),
489            status_response("channel_status", "STARTING"),
490            status_response("channel_status", "STARTING"),
491            status_response("channel_status", "STARTING"),
492            status_response("channel_status", "STARTING"),
493            status_response("channel_status", "STARTING"),
494            status_response("channel_status", "STARTING"),
495            status_response("channel_status", "STARTING"),
496            status_response("channel_status", "STARTING"),
497            status_response("channel_status", "STARTING"),
498            status_response("channel_status", "STARTING"),
499            status_response("channel_status", "STARTING"),
500            status_response("channel_status", "STARTING"),
501            status_response("channel_status", "STARTING"),
502            status_response("channel_status", "STARTING"),
503            status_response("channel_status", "STARTING"),
504            status_response("channel_status", "STARTING"),
505            status_response("channel_status", "STARTING"),
506            status_response("channel_status", "STARTING"),
507            status_response("channel_status", "STARTING"),
508            status_response("channel_status", "STARTING"),
509            status_response("channel_status", "STARTING"),
510            status_response("channel_status", "STARTING"),
511            status_response("channel_status", "STARTING"),
512            status_response("channel_status", "STARTING"),
513            status_response("channel_status", "STARTING"),
514            status_response("channel_status", "STARTING"),
515            status_response("channel_status", "STARTING"),
516            status_response("channel_status", "STARTING"),
517            status_response("channel_status", "STARTING"),
518            status_response("channel_status", "STARTING"),
519            status_response("channel_status", "STARTING"),
520            status_response("channel_status", "STARTING"),
521            status_response("channel_status", "STARTING"),
522            status_response("channel_status", "STARTING"),
523            status_response("channel_status", "STARTING"),
524            status_response("channel_status", "STARTING"),
525            status_response("channel_status", "STARTING"),
526            status_response("channel_status", "STARTING"),
527            status_response("channel_status", "STARTING"),
528            status_response("channel_status", "STARTING"),
529            status_response("channel_status", "STARTING"),
530            status_response("channel_status", "STARTING"),
531            status_response("channel_status", "STARTING"),
532            status_response("channel_status", "STARTING"),
533            status_response("channel_status", "STARTING"),
534            status_response("channel_status", "STARTING"),
535            status_response("channel_status", "STARTING"),
536            status_response("channel_status", "STARTING"),
537            status_response("channel_status", "STARTING"),
538            status_response("channel_status", "STARTING"),
539            status_response("channel_status", "STARTING"),
540            status_response("channel_status", "STARTING"),
541            status_response("channel_status", "STARTING"),
542            status_response("channel_status", "STARTING"),
543            status_response("channel_status", "STARTING"),
544        ]);
545        let mut session = mock_session(transport);
546        let result = session.start_channel_sync("MY.CH", Some(fast_config()));
547        assert!(format!("{:?}", result.unwrap_err()).starts_with("Timeout"));
548    }
549
550    // ---- stop_channel_sync ----
551
552    #[test]
553    fn stop_channel_sync_returns_stopped_via_status() {
554        let transport = MockTransport::new(vec![
555            empty_success_response(),
556            status_response("STATUS", "STOPPED"),
557        ]);
558        let mut session = mock_session(transport);
559        let result = session
560            .stop_channel_sync("MY.CH", Some(fast_config()))
561            .unwrap();
562        assert_eq!(result.operation, SyncOperation::Stopped);
563    }
564
565    #[test]
566    fn stop_channel_sync_empty_means_stopped() {
567        let transport =
568            MockTransport::new(vec![empty_success_response(), empty_success_response()]);
569        let mut session = mock_session(transport);
570        let result = session
571            .stop_channel_sync("MY.CH", Some(fast_config()))
572            .unwrap();
573        assert_eq!(result.operation, SyncOperation::Stopped);
574    }
575
576    // ---- stop_listener_sync ----
577
578    #[test]
579    fn stop_listener_sync_empty_rows_not_stopped() {
580        // Listeners have empty_means_stopped=false, so empty rows mean timeout
581        let mut responses = vec![empty_success_response()]; // STOP command
582        for _ in 0..60 {
583            responses.push(empty_success_response()); // poll returns empty
584        }
585        let transport = MockTransport::new(responses);
586        let mut session = mock_session(transport);
587        let result = session.stop_listener_sync("MY.LIS", Some(fast_config()));
588        assert!(format!("{:?}", result.unwrap_err()).starts_with("Timeout"));
589    }
590
591    #[test]
592    fn stop_listener_sync_stopped_status() {
593        let transport = MockTransport::new(vec![
594            empty_success_response(),
595            status_response("status", "STOPPED"),
596        ]);
597        let mut session = mock_session(transport);
598        let result = session
599            .stop_listener_sync("MY.LIS", Some(fast_config()))
600            .unwrap();
601        assert_eq!(result.operation, SyncOperation::Stopped);
602    }
603
604    // ---- restart_channel ----
605
606    #[test]
607    fn restart_channel_both_phases_succeed() {
608        let transport = MockTransport::new(vec![
609            empty_success_response(),                     // STOP
610            empty_success_response(),                     // poll → empty (stopped for channel)
611            empty_success_response(),                     // START
612            status_response("channel_status", "RUNNING"), // poll → RUNNING
613        ]);
614        let mut session = mock_session(transport);
615        let result = session
616            .restart_channel("MY.CH", Some(fast_config()))
617            .unwrap();
618        assert_eq!(result.operation, SyncOperation::Restarted);
619        assert!(result.polls >= 2);
620    }
621
622    #[test]
623    fn restart_channel_stop_phase_fails() {
624        let transport = MockTransport::new(vec![]);
625        let mut session = mock_session(transport);
626        let result = session.restart_channel("MY.CH", Some(fast_config()));
627        assert!(result.is_err());
628    }
629
630    #[test]
631    fn restart_channel_start_phase_fails() {
632        let transport = MockTransport::new(vec![
633            empty_success_response(), // STOP
634            empty_success_response(), // poll → empty (stopped for channel)
635                                      // START fails - no response
636        ]);
637        let mut session = mock_session(transport);
638        let result = session.restart_channel("MY.CH", Some(fast_config()));
639        assert!(result.is_err());
640    }
641
642    // ---- Macro-generated per-method tests ----
643
644    macro_rules! test_start_sync {
645        ($method:ident) => {
646            paste::paste! {
647                #[test]
648                fn [<test_ $method _ok>]() {
649                    let transport = MockTransport::new(vec![
650                        empty_success_response(),
651                        status_response("status", "RUNNING"),
652                    ]);
653                    let mut session = mock_session(transport);
654                    let result = session.$method("OBJ", Some(fast_config())).unwrap();
655                    assert_eq!(result.operation, SyncOperation::Started);
656                }
657            }
658        };
659    }
660
661    macro_rules! test_stop_sync {
662        ($method:ident) => {
663            paste::paste! {
664                #[test]
665                fn [<test_ $method _ok>]() {
666                    let transport = MockTransport::new(vec![
667                        empty_success_response(),
668                        status_response("status", "STOPPED"),
669                    ]);
670                    let mut session = mock_session(transport);
671                    let result = session.$method("OBJ", Some(fast_config())).unwrap();
672                    assert_eq!(result.operation, SyncOperation::Stopped);
673                }
674            }
675        };
676    }
677
678    macro_rules! test_restart {
679        ($method:ident) => {
680            paste::paste! {
681                #[test]
682                fn [<test_ $method _ok>]() {
683                    let transport = MockTransport::new(vec![
684                        empty_success_response(),
685                        status_response("status", "STOPPED"),
686                        empty_success_response(),
687                        status_response("status", "RUNNING"),
688                    ]);
689                    let mut session = mock_session(transport);
690                    let result = session.$method("OBJ", Some(fast_config())).unwrap();
691                    assert_eq!(result.operation, SyncOperation::Restarted);
692                }
693            }
694        };
695    }
696
697    macro_rules! test_start_sync_channel {
698        ($method:ident) => {
699            paste::paste! {
700                #[test]
701                fn [<test_ $method _ok>]() {
702                    let transport = MockTransport::new(vec![
703                        empty_success_response(),
704                        status_response("channel_status", "RUNNING"),
705                    ]);
706                    let mut session = mock_session(transport);
707                    let result = session.$method("OBJ", Some(fast_config())).unwrap();
708                    assert_eq!(result.operation, SyncOperation::Started);
709                }
710            }
711        };
712    }
713
714    macro_rules! test_stop_sync_channel {
715        ($method:ident) => {
716            paste::paste! {
717                #[test]
718                fn [<test_ $method _ok>]() {
719                    let transport = MockTransport::new(vec![
720                        empty_success_response(),
721                        status_response("channel_status", "STOPPED"),
722                    ]);
723                    let mut session = mock_session(transport);
724                    let result = session.$method("OBJ", Some(fast_config())).unwrap();
725                    assert_eq!(result.operation, SyncOperation::Stopped);
726                }
727            }
728        };
729    }
730
731    macro_rules! test_restart_channel {
732        ($method:ident) => {
733            paste::paste! {
734                #[test]
735                fn [<test_ $method _ok>]() {
736                    let transport = MockTransport::new(vec![
737                        empty_success_response(),
738                        status_response("channel_status", "STOPPED"),
739                        empty_success_response(),
740                        status_response("channel_status", "RUNNING"),
741                    ]);
742                    let mut session = mock_session(transport);
743                    let result = session.$method("OBJ", Some(fast_config())).unwrap();
744                    assert_eq!(result.operation, SyncOperation::Restarted);
745                }
746            }
747        };
748    }
749
750    test_start_sync_channel!(start_channel_sync);
751    test_start_sync!(start_listener_sync);
752    test_start_sync!(start_service_sync);
753
754    test_stop_sync_channel!(stop_channel_sync);
755    test_stop_sync!(stop_listener_sync);
756    test_stop_sync!(stop_service_sync);
757
758    test_restart_channel!(restart_channel);
759    test_restart!(restart_listener);
760    test_restart!(restart_service);
761
762    #[test]
763    fn start_channel_sync_start_command_fails() {
764        let transport = MockTransport::new(vec![]);
765        let mut session = mock_session(transport);
766        let result = session.start_channel_sync("MY.CH", Some(fast_config()));
767        assert!(result.is_err());
768    }
769
770    #[test]
771    fn start_channel_sync_poll_fails() {
772        // START succeeds but poll DISPLAY fails
773        let transport = MockTransport::new(vec![
774            empty_success_response(), // START ok
775                                      // poll fails - no response
776        ]);
777        let mut session = mock_session(transport);
778        let result = session.start_channel_sync("MY.CH", Some(fast_config()));
779        assert!(result.is_err());
780    }
781
782    #[test]
783    fn stop_channel_sync_stop_command_fails() {
784        let transport = MockTransport::new(vec![]);
785        let mut session = mock_session(transport);
786        let result = session.stop_channel_sync("MY.CH", Some(fast_config()));
787        assert!(result.is_err());
788    }
789
790    #[test]
791    fn stop_channel_sync_poll_fails() {
792        let transport = MockTransport::new(vec![
793            empty_success_response(), // STOP ok
794                                      // poll fails - no response
795        ]);
796        let mut session = mock_session(transport);
797        let result = session.stop_channel_sync("MY.CH", Some(fast_config()));
798        assert!(result.is_err());
799    }
800
801    // ---- SyncConfig validation ----
802
803    #[test]
804    fn sync_config_new_valid() {
805        let config = SyncConfig::new(10.0, 0.5).unwrap();
806        assert!((config.timeout_seconds - 10.0).abs() < f64::EPSILON);
807        assert!((config.poll_interval_seconds - 0.5).abs() < f64::EPSILON);
808    }
809
810    #[test]
811    fn sync_config_new_zero_timeout_rejected() {
812        let err = SyncConfig::new(0.0, 1.0).unwrap_err();
813        assert!(
814            format!("{err}").contains("timeout_seconds must be positive"),
815            "unexpected error: {err}"
816        );
817    }
818
819    #[test]
820    fn sync_config_new_negative_timeout_rejected() {
821        let err = SyncConfig::new(-1.0, 1.0).unwrap_err();
822        assert!(
823            format!("{err}").contains("timeout_seconds must be positive"),
824            "unexpected error: {err}"
825        );
826    }
827
828    #[test]
829    fn sync_config_new_zero_poll_interval_rejected() {
830        let err = SyncConfig::new(30.0, 0.0).unwrap_err();
831        assert!(
832            format!("{err}").contains("poll_interval_seconds must be positive"),
833            "unexpected error: {err}"
834        );
835    }
836
837    #[test]
838    fn sync_config_new_negative_poll_interval_rejected() {
839        let err = SyncConfig::new(30.0, -1.0).unwrap_err();
840        assert!(
841            format!("{err}").contains("poll_interval_seconds must be positive"),
842            "unexpected error: {err}"
843        );
844    }
845}