1use std::collections::HashMap;
4use std::thread;
5use std::time::{Duration, Instant};
6
7use serde_json::Value;
8
9use crate::error::{MqRestError, Result};
10use crate::session::MqRestSession;
11
12#[derive(Debug, Clone, Copy)]
14pub struct SyncConfig {
15 pub timeout_seconds: f64,
17 pub poll_interval_seconds: f64,
19}
20
21impl SyncConfig {
22 pub fn new(timeout_seconds: f64, poll_interval_seconds: f64) -> Result<Self> {
28 check_positive("timeout_seconds", timeout_seconds)?;
29 check_positive("poll_interval_seconds", poll_interval_seconds)?;
30 Ok(Self {
31 timeout_seconds,
32 poll_interval_seconds,
33 })
34 }
35}
36
37impl Default for SyncConfig {
38 fn default() -> Self {
39 Self {
40 timeout_seconds: 30.0,
41 poll_interval_seconds: 1.0,
42 }
43 }
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum SyncOperation {
49 Started,
51 Stopped,
53 Restarted,
55}
56
57#[derive(Debug, Clone)]
59pub struct SyncResult {
60 pub operation: SyncOperation,
62 pub polls: u32,
64 pub elapsed_seconds: f64,
66}
67
68struct ObjectTypeConfig {
69 start_qualifier: &'static str,
70 stop_qualifier: &'static str,
71 status_qualifier: &'static str,
72 status_keys: &'static [&'static str],
73 empty_means_stopped: bool,
74}
75
76const CHANNEL_CONFIG: ObjectTypeConfig = ObjectTypeConfig {
77 start_qualifier: "CHANNEL",
78 stop_qualifier: "CHANNEL",
79 status_qualifier: "CHSTATUS",
80 status_keys: &["channel_status", "STATUS"],
81 empty_means_stopped: true,
82};
83
84const LISTENER_CONFIG: ObjectTypeConfig = ObjectTypeConfig {
85 start_qualifier: "LISTENER",
86 stop_qualifier: "LISTENER",
87 status_qualifier: "LSSTATUS",
88 status_keys: &["status", "STATUS"],
89 empty_means_stopped: false,
90};
91
92const SERVICE_CONFIG: ObjectTypeConfig = ObjectTypeConfig {
93 start_qualifier: "SERVICE",
94 stop_qualifier: "SERVICE",
95 status_qualifier: "SVSTATUS",
96 status_keys: &["status", "STATUS"],
97 empty_means_stopped: false,
98};
99
100const RUNNING_VALUES: &[&str] = &["RUNNING", "running"];
101const STOPPED_VALUES: &[&str] = &["STOPPED", "stopped"];
102
103impl MqRestSession {
104 pub fn start_channel_sync(
113 &mut self,
114 name: &str,
115 config: Option<SyncConfig>,
116 ) -> Result<SyncResult> {
117 start_and_poll(self, name, &CHANNEL_CONFIG, config)
118 }
119
120 pub fn stop_channel_sync(
127 &mut self,
128 name: &str,
129 config: Option<SyncConfig>,
130 ) -> Result<SyncResult> {
131 stop_and_poll(self, name, &CHANNEL_CONFIG, config)
132 }
133
134 pub fn restart_channel(
140 &mut self,
141 name: &str,
142 config: Option<SyncConfig>,
143 ) -> Result<SyncResult> {
144 restart(self, name, &CHANNEL_CONFIG, config)
145 }
146
147 pub fn start_listener_sync(
156 &mut self,
157 name: &str,
158 config: Option<SyncConfig>,
159 ) -> Result<SyncResult> {
160 start_and_poll(self, name, &LISTENER_CONFIG, config)
161 }
162
163 pub fn stop_listener_sync(
170 &mut self,
171 name: &str,
172 config: Option<SyncConfig>,
173 ) -> Result<SyncResult> {
174 stop_and_poll(self, name, &LISTENER_CONFIG, config)
175 }
176
177 pub fn restart_listener(
183 &mut self,
184 name: &str,
185 config: Option<SyncConfig>,
186 ) -> Result<SyncResult> {
187 restart(self, name, &LISTENER_CONFIG, config)
188 }
189
190 pub fn start_service_sync(
199 &mut self,
200 name: &str,
201 config: Option<SyncConfig>,
202 ) -> Result<SyncResult> {
203 start_and_poll(self, name, &SERVICE_CONFIG, config)
204 }
205
206 pub fn stop_service_sync(
213 &mut self,
214 name: &str,
215 config: Option<SyncConfig>,
216 ) -> Result<SyncResult> {
217 stop_and_poll(self, name, &SERVICE_CONFIG, config)
218 }
219
220 pub fn restart_service(
226 &mut self,
227 name: &str,
228 config: Option<SyncConfig>,
229 ) -> Result<SyncResult> {
230 restart(self, name, &SERVICE_CONFIG, config)
231 }
232}
233
234fn start_and_poll(
235 session: &mut MqRestSession,
236 name: &str,
237 obj_config: &ObjectTypeConfig,
238 config: Option<SyncConfig>,
239) -> Result<SyncResult> {
240 let sync_config = config.unwrap_or_default();
241 session.mqsc_command(
242 "START",
243 obj_config.start_qualifier,
244 Some(name),
245 None,
246 None,
247 None,
248 )?;
249 let mut polls = 0u32;
250 let start_time = Instant::now();
251 loop {
252 thread::sleep(Duration::from_secs_f64(sync_config.poll_interval_seconds));
253 let all_params: &[&str] = &["all"];
254 let status_rows = session.mqsc_command(
255 "DISPLAY",
256 obj_config.status_qualifier,
257 Some(name),
258 None,
259 Some(all_params),
260 None,
261 )?;
262 polls += 1;
263 if has_status(&status_rows, obj_config.status_keys, RUNNING_VALUES) {
264 let elapsed = start_time.elapsed().as_secs_f64();
265 return Ok(SyncResult {
266 operation: SyncOperation::Started,
267 polls,
268 elapsed_seconds: elapsed,
269 });
270 }
271 let elapsed = start_time.elapsed().as_secs_f64();
272 if elapsed >= sync_config.timeout_seconds {
273 return Err(MqRestError::Timeout {
274 name: name.into(),
275 operation: "start".into(),
276 elapsed,
277 message: format!(
278 "{} '{}' did not reach RUNNING within {}s",
279 obj_config.start_qualifier, name, sync_config.timeout_seconds
280 ),
281 });
282 }
283 }
284}
285
286fn stop_and_poll(
287 session: &mut MqRestSession,
288 name: &str,
289 obj_config: &ObjectTypeConfig,
290 config: Option<SyncConfig>,
291) -> Result<SyncResult> {
292 let sync_config = config.unwrap_or_default();
293 session.mqsc_command(
294 "STOP",
295 obj_config.stop_qualifier,
296 Some(name),
297 None,
298 None,
299 None,
300 )?;
301 let mut polls = 0u32;
302 let start_time = Instant::now();
303 loop {
304 thread::sleep(Duration::from_secs_f64(sync_config.poll_interval_seconds));
305 let all_params: &[&str] = &["all"];
306 let status_rows = session.mqsc_command(
307 "DISPLAY",
308 obj_config.status_qualifier,
309 Some(name),
310 None,
311 Some(all_params),
312 None,
313 )?;
314 polls += 1;
315 if obj_config.empty_means_stopped && status_rows.is_empty() {
316 let elapsed = start_time.elapsed().as_secs_f64();
317 return Ok(SyncResult {
318 operation: SyncOperation::Stopped,
319 polls,
320 elapsed_seconds: elapsed,
321 });
322 }
323 if has_status(&status_rows, obj_config.status_keys, STOPPED_VALUES) {
324 let elapsed = start_time.elapsed().as_secs_f64();
325 return Ok(SyncResult {
326 operation: SyncOperation::Stopped,
327 polls,
328 elapsed_seconds: elapsed,
329 });
330 }
331 let elapsed = start_time.elapsed().as_secs_f64();
332 if elapsed >= sync_config.timeout_seconds {
333 return Err(MqRestError::Timeout {
334 name: name.into(),
335 operation: "stop".into(),
336 elapsed,
337 message: format!(
338 "{} '{}' did not reach STOPPED within {}s",
339 obj_config.stop_qualifier, name, sync_config.timeout_seconds
340 ),
341 });
342 }
343 }
344}
345
346fn restart(
347 session: &mut MqRestSession,
348 name: &str,
349 obj_config: &ObjectTypeConfig,
350 config: Option<SyncConfig>,
351) -> Result<SyncResult> {
352 let stop_result = stop_and_poll(session, name, obj_config, config)?;
353 let start_result = start_and_poll(session, name, obj_config, config)?;
354 Ok(SyncResult {
355 operation: SyncOperation::Restarted,
356 polls: stop_result.polls + start_result.polls,
357 elapsed_seconds: stop_result.elapsed_seconds + start_result.elapsed_seconds,
358 })
359}
360
361fn check_positive(field: &str, value: f64) -> Result<()> {
362 if value > 0.0 {
363 return Ok(());
364 }
365 Err(MqRestError::InvalidConfig {
366 message: format!("{field} must be positive, got {value}"),
367 })
368}
369
370fn has_status(
371 rows: &[HashMap<String, Value>],
372 status_keys: &[&str],
373 target_values: &[&str],
374) -> bool {
375 for row in rows {
376 for key in status_keys {
377 if let Some(Value::String(value)) = row.get(*key)
378 && target_values.contains(&value.as_str())
379 {
380 return true;
381 }
382 }
383 }
384 false
385}
386
387#[cfg(test)]
388mod tests {
389 use super::*;
390 use crate::test_helpers::{
391 MockTransport, empty_success_response, mock_session, success_response,
392 };
393 use serde_json::json;
394
395 fn fast_config() -> SyncConfig {
396 SyncConfig {
397 timeout_seconds: 0.5,
398 poll_interval_seconds: 0.01,
399 }
400 }
401
402 fn status_response(key: &str, value: &str) -> crate::transport::TransportResponse {
403 let mut params = HashMap::new();
404 params.insert(key.into(), json!(value));
405 success_response(vec![params])
406 }
407
408 #[test]
411 fn sync_config_default_values() {
412 let config = SyncConfig::default();
413 assert!((config.timeout_seconds - 30.0).abs() < f64::EPSILON);
414 assert!((config.poll_interval_seconds - 1.0).abs() < f64::EPSILON);
415 }
416
417 #[test]
420 fn has_status_match_first_key() {
421 let mut row = HashMap::new();
422 row.insert("channel_status".into(), json!("RUNNING"));
423 assert!(has_status(
424 &[row],
425 &["channel_status", "STATUS"],
426 &["RUNNING"]
427 ));
428 }
429
430 #[test]
431 fn has_status_match_second_key() {
432 let mut row = HashMap::new();
433 row.insert("STATUS".into(), json!("STOPPED"));
434 assert!(has_status(
435 &[row],
436 &["channel_status", "STATUS"],
437 &["STOPPED"]
438 ));
439 }
440
441 #[test]
442 fn has_status_no_match() {
443 let mut row = HashMap::new();
444 row.insert("STATUS".into(), json!("STARTING"));
445 assert!(!has_status(
446 &[row],
447 &["channel_status", "STATUS"],
448 &["RUNNING"]
449 ));
450 }
451
452 #[test]
453 fn has_status_empty_rows() {
454 assert!(!has_status(&[], &["STATUS"], &["RUNNING"]));
455 }
456
457 #[test]
458 fn has_status_non_string_value() {
459 let mut row = HashMap::new();
460 row.insert("STATUS".into(), json!(42));
461 assert!(!has_status(&[row], &["STATUS"], &["RUNNING"]));
462 }
463
464 #[test]
467 fn start_channel_sync_first_poll_running() {
468 let transport = MockTransport::new(vec![
469 empty_success_response(),
470 status_response("channel_status", "RUNNING"),
471 ]);
472 let mut session = mock_session(transport);
473 let result = session
474 .start_channel_sync("MY.CH", Some(fast_config()))
475 .unwrap();
476 assert_eq!(result.operation, SyncOperation::Started);
477 assert!(result.polls >= 1);
478 }
479
480 #[test]
481 fn start_channel_sync_timeout() {
482 let transport = MockTransport::new(vec![
483 empty_success_response(),
484 status_response("channel_status", "STARTING"),
485 status_response("channel_status", "STARTING"),
486 status_response("channel_status", "STARTING"),
487 status_response("channel_status", "STARTING"),
488 status_response("channel_status", "STARTING"),
489 status_response("channel_status", "STARTING"),
490 status_response("channel_status", "STARTING"),
491 status_response("channel_status", "STARTING"),
492 status_response("channel_status", "STARTING"),
493 status_response("channel_status", "STARTING"),
494 status_response("channel_status", "STARTING"),
495 status_response("channel_status", "STARTING"),
496 status_response("channel_status", "STARTING"),
497 status_response("channel_status", "STARTING"),
498 status_response("channel_status", "STARTING"),
499 status_response("channel_status", "STARTING"),
500 status_response("channel_status", "STARTING"),
501 status_response("channel_status", "STARTING"),
502 status_response("channel_status", "STARTING"),
503 status_response("channel_status", "STARTING"),
504 status_response("channel_status", "STARTING"),
505 status_response("channel_status", "STARTING"),
506 status_response("channel_status", "STARTING"),
507 status_response("channel_status", "STARTING"),
508 status_response("channel_status", "STARTING"),
509 status_response("channel_status", "STARTING"),
510 status_response("channel_status", "STARTING"),
511 status_response("channel_status", "STARTING"),
512 status_response("channel_status", "STARTING"),
513 status_response("channel_status", "STARTING"),
514 status_response("channel_status", "STARTING"),
515 status_response("channel_status", "STARTING"),
516 status_response("channel_status", "STARTING"),
517 status_response("channel_status", "STARTING"),
518 status_response("channel_status", "STARTING"),
519 status_response("channel_status", "STARTING"),
520 status_response("channel_status", "STARTING"),
521 status_response("channel_status", "STARTING"),
522 status_response("channel_status", "STARTING"),
523 status_response("channel_status", "STARTING"),
524 status_response("channel_status", "STARTING"),
525 status_response("channel_status", "STARTING"),
526 status_response("channel_status", "STARTING"),
527 status_response("channel_status", "STARTING"),
528 status_response("channel_status", "STARTING"),
529 status_response("channel_status", "STARTING"),
530 status_response("channel_status", "STARTING"),
531 status_response("channel_status", "STARTING"),
532 status_response("channel_status", "STARTING"),
533 status_response("channel_status", "STARTING"),
534 status_response("channel_status", "STARTING"),
535 status_response("channel_status", "STARTING"),
536 status_response("channel_status", "STARTING"),
537 status_response("channel_status", "STARTING"),
538 status_response("channel_status", "STARTING"),
539 status_response("channel_status", "STARTING"),
540 status_response("channel_status", "STARTING"),
541 status_response("channel_status", "STARTING"),
542 status_response("channel_status", "STARTING"),
543 status_response("channel_status", "STARTING"),
544 ]);
545 let mut session = mock_session(transport);
546 let result = session.start_channel_sync("MY.CH", Some(fast_config()));
547 assert!(format!("{:?}", result.unwrap_err()).starts_with("Timeout"));
548 }
549
550 #[test]
553 fn stop_channel_sync_returns_stopped_via_status() {
554 let transport = MockTransport::new(vec![
555 empty_success_response(),
556 status_response("STATUS", "STOPPED"),
557 ]);
558 let mut session = mock_session(transport);
559 let result = session
560 .stop_channel_sync("MY.CH", Some(fast_config()))
561 .unwrap();
562 assert_eq!(result.operation, SyncOperation::Stopped);
563 }
564
565 #[test]
566 fn stop_channel_sync_empty_means_stopped() {
567 let transport =
568 MockTransport::new(vec![empty_success_response(), empty_success_response()]);
569 let mut session = mock_session(transport);
570 let result = session
571 .stop_channel_sync("MY.CH", Some(fast_config()))
572 .unwrap();
573 assert_eq!(result.operation, SyncOperation::Stopped);
574 }
575
576 #[test]
579 fn stop_listener_sync_empty_rows_not_stopped() {
580 let mut responses = vec![empty_success_response()]; for _ in 0..60 {
583 responses.push(empty_success_response()); }
585 let transport = MockTransport::new(responses);
586 let mut session = mock_session(transport);
587 let result = session.stop_listener_sync("MY.LIS", Some(fast_config()));
588 assert!(format!("{:?}", result.unwrap_err()).starts_with("Timeout"));
589 }
590
591 #[test]
592 fn stop_listener_sync_stopped_status() {
593 let transport = MockTransport::new(vec![
594 empty_success_response(),
595 status_response("status", "STOPPED"),
596 ]);
597 let mut session = mock_session(transport);
598 let result = session
599 .stop_listener_sync("MY.LIS", Some(fast_config()))
600 .unwrap();
601 assert_eq!(result.operation, SyncOperation::Stopped);
602 }
603
604 #[test]
607 fn restart_channel_both_phases_succeed() {
608 let transport = MockTransport::new(vec![
609 empty_success_response(), empty_success_response(), empty_success_response(), status_response("channel_status", "RUNNING"), ]);
614 let mut session = mock_session(transport);
615 let result = session
616 .restart_channel("MY.CH", Some(fast_config()))
617 .unwrap();
618 assert_eq!(result.operation, SyncOperation::Restarted);
619 assert!(result.polls >= 2);
620 }
621
622 #[test]
623 fn restart_channel_stop_phase_fails() {
624 let transport = MockTransport::new(vec![]);
625 let mut session = mock_session(transport);
626 let result = session.restart_channel("MY.CH", Some(fast_config()));
627 assert!(result.is_err());
628 }
629
630 #[test]
631 fn restart_channel_start_phase_fails() {
632 let transport = MockTransport::new(vec![
633 empty_success_response(), empty_success_response(), ]);
637 let mut session = mock_session(transport);
638 let result = session.restart_channel("MY.CH", Some(fast_config()));
639 assert!(result.is_err());
640 }
641
642 macro_rules! test_start_sync {
645 ($method:ident) => {
646 paste::paste! {
647 #[test]
648 fn [<test_ $method _ok>]() {
649 let transport = MockTransport::new(vec![
650 empty_success_response(),
651 status_response("status", "RUNNING"),
652 ]);
653 let mut session = mock_session(transport);
654 let result = session.$method("OBJ", Some(fast_config())).unwrap();
655 assert_eq!(result.operation, SyncOperation::Started);
656 }
657 }
658 };
659 }
660
661 macro_rules! test_stop_sync {
662 ($method:ident) => {
663 paste::paste! {
664 #[test]
665 fn [<test_ $method _ok>]() {
666 let transport = MockTransport::new(vec![
667 empty_success_response(),
668 status_response("status", "STOPPED"),
669 ]);
670 let mut session = mock_session(transport);
671 let result = session.$method("OBJ", Some(fast_config())).unwrap();
672 assert_eq!(result.operation, SyncOperation::Stopped);
673 }
674 }
675 };
676 }
677
678 macro_rules! test_restart {
679 ($method:ident) => {
680 paste::paste! {
681 #[test]
682 fn [<test_ $method _ok>]() {
683 let transport = MockTransport::new(vec![
684 empty_success_response(),
685 status_response("status", "STOPPED"),
686 empty_success_response(),
687 status_response("status", "RUNNING"),
688 ]);
689 let mut session = mock_session(transport);
690 let result = session.$method("OBJ", Some(fast_config())).unwrap();
691 assert_eq!(result.operation, SyncOperation::Restarted);
692 }
693 }
694 };
695 }
696
697 macro_rules! test_start_sync_channel {
698 ($method:ident) => {
699 paste::paste! {
700 #[test]
701 fn [<test_ $method _ok>]() {
702 let transport = MockTransport::new(vec![
703 empty_success_response(),
704 status_response("channel_status", "RUNNING"),
705 ]);
706 let mut session = mock_session(transport);
707 let result = session.$method("OBJ", Some(fast_config())).unwrap();
708 assert_eq!(result.operation, SyncOperation::Started);
709 }
710 }
711 };
712 }
713
714 macro_rules! test_stop_sync_channel {
715 ($method:ident) => {
716 paste::paste! {
717 #[test]
718 fn [<test_ $method _ok>]() {
719 let transport = MockTransport::new(vec![
720 empty_success_response(),
721 status_response("channel_status", "STOPPED"),
722 ]);
723 let mut session = mock_session(transport);
724 let result = session.$method("OBJ", Some(fast_config())).unwrap();
725 assert_eq!(result.operation, SyncOperation::Stopped);
726 }
727 }
728 };
729 }
730
731 macro_rules! test_restart_channel {
732 ($method:ident) => {
733 paste::paste! {
734 #[test]
735 fn [<test_ $method _ok>]() {
736 let transport = MockTransport::new(vec![
737 empty_success_response(),
738 status_response("channel_status", "STOPPED"),
739 empty_success_response(),
740 status_response("channel_status", "RUNNING"),
741 ]);
742 let mut session = mock_session(transport);
743 let result = session.$method("OBJ", Some(fast_config())).unwrap();
744 assert_eq!(result.operation, SyncOperation::Restarted);
745 }
746 }
747 };
748 }
749
750 test_start_sync_channel!(start_channel_sync);
751 test_start_sync!(start_listener_sync);
752 test_start_sync!(start_service_sync);
753
754 test_stop_sync_channel!(stop_channel_sync);
755 test_stop_sync!(stop_listener_sync);
756 test_stop_sync!(stop_service_sync);
757
758 test_restart_channel!(restart_channel);
759 test_restart!(restart_listener);
760 test_restart!(restart_service);
761
762 #[test]
763 fn start_channel_sync_start_command_fails() {
764 let transport = MockTransport::new(vec![]);
765 let mut session = mock_session(transport);
766 let result = session.start_channel_sync("MY.CH", Some(fast_config()));
767 assert!(result.is_err());
768 }
769
770 #[test]
771 fn start_channel_sync_poll_fails() {
772 let transport = MockTransport::new(vec![
774 empty_success_response(), ]);
777 let mut session = mock_session(transport);
778 let result = session.start_channel_sync("MY.CH", Some(fast_config()));
779 assert!(result.is_err());
780 }
781
782 #[test]
783 fn stop_channel_sync_stop_command_fails() {
784 let transport = MockTransport::new(vec![]);
785 let mut session = mock_session(transport);
786 let result = session.stop_channel_sync("MY.CH", Some(fast_config()));
787 assert!(result.is_err());
788 }
789
790 #[test]
791 fn stop_channel_sync_poll_fails() {
792 let transport = MockTransport::new(vec![
793 empty_success_response(), ]);
796 let mut session = mock_session(transport);
797 let result = session.stop_channel_sync("MY.CH", Some(fast_config()));
798 assert!(result.is_err());
799 }
800
801 #[test]
804 fn sync_config_new_valid() {
805 let config = SyncConfig::new(10.0, 0.5).unwrap();
806 assert!((config.timeout_seconds - 10.0).abs() < f64::EPSILON);
807 assert!((config.poll_interval_seconds - 0.5).abs() < f64::EPSILON);
808 }
809
810 #[test]
811 fn sync_config_new_zero_timeout_rejected() {
812 let err = SyncConfig::new(0.0, 1.0).unwrap_err();
813 assert!(
814 format!("{err}").contains("timeout_seconds must be positive"),
815 "unexpected error: {err}"
816 );
817 }
818
819 #[test]
820 fn sync_config_new_negative_timeout_rejected() {
821 let err = SyncConfig::new(-1.0, 1.0).unwrap_err();
822 assert!(
823 format!("{err}").contains("timeout_seconds must be positive"),
824 "unexpected error: {err}"
825 );
826 }
827
828 #[test]
829 fn sync_config_new_zero_poll_interval_rejected() {
830 let err = SyncConfig::new(30.0, 0.0).unwrap_err();
831 assert!(
832 format!("{err}").contains("poll_interval_seconds must be positive"),
833 "unexpected error: {err}"
834 );
835 }
836
837 #[test]
838 fn sync_config_new_negative_poll_interval_rejected() {
839 let err = SyncConfig::new(30.0, -1.0).unwrap_err();
840 assert!(
841 format!("{err}").contains("poll_interval_seconds must be positive"),
842 "unexpected error: {err}"
843 );
844 }
845}