1use crate::ssh_utils::CommandResult;
9use crate::types::{WorkerConfig, WorkerId};
10use std::cell::Cell;
11use std::collections::HashMap;
12use std::sync::atomic::{AtomicU32, Ordering};
13use std::sync::{Arc, Mutex, OnceLock};
14use std::time::Instant;
15use tracing::{debug, info};
16
17fn env_flag(key: &str) -> bool {
18 std::env::var(key)
19 .map(|v| v == "1" || v.to_lowercase() == "true")
20 .unwrap_or(false)
21}
22
23#[derive(Debug, Default, Clone)]
24struct MockOverrides {
25 enabled: Option<bool>,
26 ssh_config: Option<MockConfig>,
27 rsync_config: Option<MockRsyncConfig>,
28 active_scopes: usize,
35}
36
37fn overrides() -> &'static Mutex<MockOverrides> {
38 static OVERRIDES: OnceLock<Mutex<MockOverrides>> = OnceLock::new();
39 OVERRIDES.get_or_init(|| Mutex::new(MockOverrides::default()))
40}
41
42thread_local! {
58 static THREAD_MOCK_ENABLED: Cell<Option<bool>> = const { Cell::new(None) };
59}
60
61pub fn set_thread_mock_override(enabled: Option<bool>) {
67 THREAD_MOCK_ENABLED.with(|c| c.set(enabled));
68}
69
70pub fn clear_thread_mock_override() {
72 THREAD_MOCK_ENABLED.with(|c| c.set(None));
73}
74
75pub fn set_mock_enabled_override(enabled: Option<bool>) {
81 let mut guard = overrides().lock().unwrap();
82 if enabled.is_some() {
83 guard.active_scopes = guard.active_scopes.saturating_add(1);
84 }
85 guard.enabled = enabled;
86 drop(guard);
87 THREAD_MOCK_ENABLED.with(|c| c.set(enabled));
89}
90
91pub fn set_mock_ssh_config_override(config: Option<MockConfig>) {
93 overrides().lock().unwrap().ssh_config = config;
94}
95
96pub fn set_mock_rsync_config_override(config: Option<MockRsyncConfig>) {
98 overrides().lock().unwrap().rsync_config = config;
99}
100
101pub fn clear_mock_overrides() {
103 let mut guard = overrides().lock().unwrap();
104 if guard.active_scopes > 0 {
105 guard.active_scopes -= 1;
106 }
107 if guard.active_scopes == 0 {
108 guard.enabled = None;
109 guard.ssh_config = None;
110 guard.rsync_config = None;
111 }
112 drop(guard);
113 THREAD_MOCK_ENABLED.with(|c| c.set(None));
116}
117
118pub fn is_mock_enabled() -> bool {
125 if let Some(enabled) = THREAD_MOCK_ENABLED.with(|c| c.get()) {
127 return enabled;
128 }
129 if let Some(enabled) = overrides().lock().unwrap().enabled {
130 return enabled;
131 }
132 std::env::var("RCH_MOCK_SSH")
133 .map(|v| v == "1" || v.to_lowercase() == "true")
134 .unwrap_or(false)
135}
136
137pub fn is_mock_host(host: &str) -> bool {
139 host.starts_with("mock://")
140}
141
142pub fn is_mock_worker(worker: &WorkerConfig) -> bool {
144 is_mock_host(&worker.host)
145}
146
147fn global_ssh_invocations() -> &'static Mutex<Vec<MockInvocation>> {
148 static GLOBAL: OnceLock<Mutex<Vec<MockInvocation>>> = OnceLock::new();
149 GLOBAL.get_or_init(|| Mutex::new(Vec::new()))
150}
151
152fn global_rsync_invocations() -> &'static Mutex<Vec<MockSyncInvocation>> {
153 static GLOBAL: OnceLock<Mutex<Vec<MockSyncInvocation>>> = OnceLock::new();
154 GLOBAL.get_or_init(|| Mutex::new(Vec::new()))
155}
156
157pub fn clear_global_invocations() {
159 global_ssh_invocations().lock().unwrap().clear();
160 global_rsync_invocations().lock().unwrap().clear();
161}
162
163pub fn global_ssh_invocations_snapshot() -> Vec<MockInvocation> {
165 global_ssh_invocations().lock().unwrap().clone()
166}
167
168pub fn global_rsync_invocations_snapshot() -> Vec<MockSyncInvocation> {
170 global_rsync_invocations().lock().unwrap().clone()
171}
172
173#[derive(Debug, Clone, Copy, PartialEq, Eq)]
175pub enum Phase {
176 Sync,
178 Execute,
180 Artifacts,
182 Connect,
184 Disconnect,
186}
187
188impl std::fmt::Display for Phase {
189 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190 match self {
191 Phase::Sync => write!(f, "SYNC"),
192 Phase::Execute => write!(f, "EXEC"),
193 Phase::Artifacts => write!(f, "ARTIFACTS"),
194 Phase::Connect => write!(f, "CONNECT"),
195 Phase::Disconnect => write!(f, "DISCONNECT"),
196 }
197 }
198}
199
200pub fn log_phase(phase: Phase, message: &str) {
202 let timestamp = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ");
203 info!("[{}] [{}] {}", timestamp, phase, message);
204}
205
206pub fn debug_phase(phase: Phase, message: &str) {
208 let timestamp = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ");
209 debug!("[{}] [{}] {}", timestamp, phase, message);
210}
211
212#[derive(Debug, Clone)]
214pub struct MockInvocation {
215 pub worker_id: WorkerId,
217 pub command: Option<String>,
219 pub phase: Phase,
221 pub timestamp: std::time::SystemTime,
223}
224
225#[derive(Debug, Clone)]
227pub struct MockConfig {
228 pub default_exit_code: i32,
230 pub default_stdout: String,
232 pub default_stderr: String,
234 pub fail_connect: bool,
236 pub fail_connect_attempts: u32,
238 pub fail_execute: bool,
240 pub fail_execute_attempts: u32,
242 pub execution_delay_ms: u64,
244 pub command_results: HashMap<String, CommandResult>,
246 pub fail_toolchain_install: bool,
248 pub no_rustup: bool,
250}
251
252impl Default for MockConfig {
253 fn default() -> Self {
254 Self {
255 default_exit_code: 0,
256 default_stdout: String::new(),
257 default_stderr: String::new(),
258 fail_connect: false,
259 fail_connect_attempts: 0,
260 fail_execute: false,
261 fail_execute_attempts: 0,
262 execution_delay_ms: 10,
263 command_results: HashMap::new(),
264 fail_toolchain_install: false,
265 no_rustup: false,
266 }
267 }
268}
269
270impl MockConfig {
271 pub fn success() -> Self {
273 Self::default()
274 }
275
276 pub fn connection_failure() -> Self {
278 Self {
279 fail_connect: true,
280 ..Self::default()
281 }
282 }
283
284 pub fn command_failure(exit_code: i32, stderr: &str) -> Self {
286 Self {
287 default_exit_code: exit_code,
288 default_stderr: stderr.to_string(),
289 fail_execute: true,
290 ..Self::default()
291 }
292 }
293
294 pub fn with_command_result(mut self, command: &str, result: CommandResult) -> Self {
296 self.command_results.insert(command.to_string(), result);
297 self
298 }
299
300 pub fn with_stdout(mut self, stdout: &str) -> Self {
302 self.default_stdout = stdout.to_string();
303 self
304 }
305
306 pub fn from_env() -> Self {
308 if let Some(config) = overrides().lock().unwrap().ssh_config.clone() {
309 return config;
310 }
311
312 let mut config = MockConfig::default();
313
314 if let Ok(val) = std::env::var("RCH_MOCK_SSH_EXIT_CODE")
315 && let Ok(code) = val.parse()
316 {
317 config.default_exit_code = code;
318 }
319 if let Ok(val) = std::env::var("RCH_MOCK_SSH_STDOUT") {
320 config.default_stdout = val;
321 }
322 if let Ok(val) = std::env::var("RCH_MOCK_SSH_STDERR") {
323 config.default_stderr = val;
324 }
325 if let Ok(val) = std::env::var("RCH_MOCK_SSH_DELAY_MS")
326 && let Ok(delay) = val.parse()
327 {
328 config.execution_delay_ms = delay;
329 }
330
331 config.fail_connect = env_flag("RCH_MOCK_SSH_FAIL_CONNECT");
332 config.fail_execute = env_flag("RCH_MOCK_SSH_FAIL_EXECUTE");
333
334 if let Ok(val) = std::env::var("RCH_MOCK_SSH_FAIL_CONNECT_ATTEMPTS")
335 && let Ok(count) = val.parse()
336 {
337 config.fail_connect_attempts = count;
338 }
339 if let Ok(val) = std::env::var("RCH_MOCK_SSH_FAIL_EXECUTE_ATTEMPTS")
340 && let Ok(count) = val.parse()
341 {
342 config.fail_execute_attempts = count;
343 }
344
345 config.fail_toolchain_install = env_flag("RCH_MOCK_TOOLCHAIN_INSTALL_FAIL");
346 config.no_rustup = env_flag("RCH_MOCK_NO_RUSTUP");
347
348 config
349 }
350
351 pub fn toolchain_install_failure() -> Self {
353 Self {
354 fail_toolchain_install: true,
355 ..Self::default()
356 }
357 }
358
359 pub fn no_rustup() -> Self {
361 Self {
362 no_rustup: true,
363 ..Self::default()
364 }
365 }
366}
367
368pub struct MockSshClient {
370 config: WorkerConfig,
372 mock_config: MockConfig,
374 connected: bool,
376 invocations: Arc<Mutex<Vec<MockInvocation>>>,
378 connect_failures_remaining: AtomicU32,
380 execute_failures_remaining: AtomicU32,
382}
383
384impl MockSshClient {
385 pub fn new(config: WorkerConfig, mock_config: MockConfig) -> Self {
387 Self {
388 config,
389 connect_failures_remaining: AtomicU32::new(mock_config.fail_connect_attempts),
390 execute_failures_remaining: AtomicU32::new(mock_config.fail_execute_attempts),
391 mock_config,
392 connected: false,
393 invocations: Arc::new(Mutex::new(Vec::new())),
394 }
395 }
396
397 pub fn new_default(config: WorkerConfig) -> Self {
399 Self::new(config, MockConfig::default())
400 }
401
402 pub fn worker_id(&self) -> &WorkerId {
404 &self.config.id
405 }
406
407 pub fn is_connected(&self) -> bool {
409 self.connected
410 }
411
412 pub fn invocations(&self) -> Vec<MockInvocation> {
414 self.invocations.lock().unwrap().clone()
415 }
416
417 pub fn clear_invocations(&self) {
419 self.invocations.lock().unwrap().clear();
420 }
421
422 fn record(&self, phase: Phase, command: Option<String>) {
423 let invocation = MockInvocation {
424 worker_id: self.config.id.clone(),
425 command,
426 phase,
427 timestamp: std::time::SystemTime::now(),
428 };
429
430 let mut invocations = self.invocations.lock().unwrap();
431 invocations.push(invocation.clone());
432
433 let mut global = global_ssh_invocations().lock().unwrap();
434 global.push(invocation);
435 }
436
437 pub async fn connect(&mut self) -> anyhow::Result<()> {
439 log_phase(
440 Phase::Connect,
441 &format!("Connecting to mock worker {}", self.config.id),
442 );
443 self.record(Phase::Connect, None);
444
445 if self.mock_config.fail_connect {
446 log_phase(
447 Phase::Connect,
448 &format!("Mock connection failed for {}", self.config.id),
449 );
450 return Err(anyhow::anyhow!(
451 "Mock: Connection failed to {}",
452 self.config.id
453 ));
454 }
455
456 if self
458 .connect_failures_remaining
459 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
460 current.checked_sub(1)
461 })
462 .is_ok()
463 {
464 log_phase(
465 Phase::Connect,
466 &format!("Mock transient connect failure for {}", self.config.id),
467 );
468 return Err(anyhow::anyhow!(
469 "Mock: Connection timed out to {}",
470 self.config.id
471 ));
472 }
473
474 tokio::time::sleep(tokio::time::Duration::from_millis(5)).await;
476
477 self.connected = true;
478 log_phase(
479 Phase::Connect,
480 &format!("Mock connected to {}", self.config.id),
481 );
482 Ok(())
483 }
484
485 pub async fn disconnect(&mut self) -> anyhow::Result<()> {
487 log_phase(
488 Phase::Disconnect,
489 &format!("Disconnecting from mock worker {}", self.config.id),
490 );
491 self.record(Phase::Disconnect, None);
492 self.connected = false;
493 Ok(())
494 }
495
496 pub async fn execute(&self, command: &str) -> anyhow::Result<CommandResult> {
498 if !self.connected {
499 return Err(anyhow::anyhow!("Mock: Not connected to worker"));
500 }
501
502 log_phase(
503 Phase::Execute,
504 &format!("Executing on {}: {}", self.config.id, command),
505 );
506 self.record(Phase::Execute, Some(command.to_string()));
507
508 if self.mock_config.fail_execute {
509 log_phase(
510 Phase::Execute,
511 &format!("Mock execution failed for {}", self.config.id),
512 );
513 return Err(anyhow::anyhow!("Mock: Command execution failed"));
514 }
515
516 if self
518 .execute_failures_remaining
519 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
520 current.checked_sub(1)
521 })
522 .is_ok()
523 {
524 log_phase(
525 Phase::Execute,
526 &format!("Mock transient execute failure for {}", self.config.id),
527 );
528 return Err(anyhow::anyhow!("Mock: Broken pipe"));
529 }
530
531 let start = Instant::now();
533 tokio::time::sleep(tokio::time::Duration::from_millis(
534 self.mock_config.execution_delay_ms,
535 ))
536 .await;
537
538 if self.mock_config.no_rustup && command.contains("rustup") {
540 log_phase(
541 Phase::Execute,
542 "Mock: rustup not available (no_rustup mode)",
543 );
544 return Ok(CommandResult {
545 exit_code: 127,
546 stdout: String::new(),
547 stderr: "rustup: command not found".to_string(),
548 duration_ms: start.elapsed().as_millis() as u64,
549 });
550 }
551
552 if self.mock_config.fail_toolchain_install
553 && command.contains("rustup")
554 && (command.contains("toolchain install") || command.contains("run"))
555 {
556 log_phase(
557 Phase::Execute,
558 "Mock: toolchain install failed (fail_toolchain_install mode)",
559 );
560 return Ok(CommandResult {
561 exit_code: 1,
562 stdout: String::new(),
563 stderr: "error: toolchain 'nightly-2024-01-15' is not installed".to_string(),
564 duration_ms: start.elapsed().as_millis() as u64,
565 });
566 }
567
568 if let Some(result) = self.mock_config.command_results.get(command) {
570 log_phase(
571 Phase::Execute,
572 &format!(
573 "Mock command completed (specific): exit={}",
574 result.exit_code
575 ),
576 );
577 return Ok(result.clone());
578 }
579
580 let result = CommandResult {
581 exit_code: self.mock_config.default_exit_code,
582 stdout: self.mock_config.default_stdout.clone(),
583 stderr: self.mock_config.default_stderr.clone(),
584 duration_ms: start.elapsed().as_millis() as u64,
585 };
586
587 log_phase(
588 Phase::Execute,
589 &format!("Mock command completed: exit={}", result.exit_code),
590 );
591 Ok(result)
592 }
593
594 pub async fn execute_streaming<F, G>(
596 &self,
597 command: &str,
598 mut on_stdout: F,
599 mut on_stderr: G,
600 ) -> anyhow::Result<CommandResult>
601 where
602 F: FnMut(&str),
603 G: FnMut(&str),
604 {
605 let result = self.execute(command).await?;
606
607 for line in result.stdout.lines() {
609 on_stdout(&format!("{}\n", line));
610 }
611 for line in result.stderr.lines() {
612 on_stderr(&format!("{}\n", line));
613 }
614
615 Ok(result)
616 }
617
618 pub async fn health_check(&self) -> anyhow::Result<bool> {
620 match self.execute("echo ok").await {
621 Ok(result) => Ok(result.exit_code == 0),
622 Err(_) => Ok(false),
623 }
624 }
625}
626
627#[derive(Debug, Clone)]
629pub struct MockRsyncResult {
630 pub files_transferred: u32,
632 pub bytes_transferred: u64,
634 pub duration_ms: u64,
636}
637
638pub struct MockRsync {
640 sync_invocations: Arc<Mutex<Vec<MockSyncInvocation>>>,
642 config: MockRsyncConfig,
644 sync_failures_remaining: AtomicU32,
646 artifacts_failures_remaining: AtomicU32,
648}
649
650#[derive(Debug, Clone, Default)]
652pub struct MockRsyncConfig {
653 pub fail_sync: bool,
655 pub fail_sync_attempts: u32,
657 pub fail_artifacts: bool,
659 pub fail_artifacts_attempts: u32,
661 pub files_per_sync: u32,
663 pub bytes_per_sync: u64,
665}
666
667impl MockRsyncConfig {
668 pub fn success() -> Self {
670 Self {
671 fail_sync: false,
672 fail_sync_attempts: 0,
673 fail_artifacts: false,
674 fail_artifacts_attempts: 0,
675 files_per_sync: 10,
676 bytes_per_sync: 1024 * 100,
677 }
678 }
679
680 pub fn sync_failure() -> Self {
682 Self {
683 fail_sync: true,
684 ..Self::default()
685 }
686 }
687
688 pub fn artifact_failure() -> Self {
690 Self {
691 fail_artifacts: true,
692 ..Self::default()
693 }
694 }
695
696 pub fn from_env() -> Self {
698 if let Some(config) = overrides().lock().unwrap().rsync_config.clone() {
699 return config;
700 }
701
702 let mut config = MockRsyncConfig::success();
703
704 config.fail_sync = env_flag("RCH_MOCK_RSYNC_FAIL_SYNC");
705 config.fail_artifacts = env_flag("RCH_MOCK_RSYNC_FAIL_ARTIFACTS");
706
707 if let Ok(val) = std::env::var("RCH_MOCK_RSYNC_FAIL_SYNC_ATTEMPTS")
708 && let Ok(count) = val.parse()
709 {
710 config.fail_sync_attempts = count;
711 }
712 if let Ok(val) = std::env::var("RCH_MOCK_RSYNC_FAIL_ARTIFACTS_ATTEMPTS")
713 && let Ok(count) = val.parse()
714 {
715 config.fail_artifacts_attempts = count;
716 }
717
718 if let Ok(val) = std::env::var("RCH_MOCK_RSYNC_FILES")
719 && let Ok(files) = val.parse()
720 {
721 config.files_per_sync = files;
722 }
723 if let Ok(val) = std::env::var("RCH_MOCK_RSYNC_BYTES")
724 && let Ok(bytes) = val.parse()
725 {
726 config.bytes_per_sync = bytes;
727 }
728
729 config
730 }
731}
732
733#[derive(Debug, Clone)]
735pub struct MockSyncInvocation {
736 pub source: String,
738 pub destination: String,
740 pub phase: Phase,
742 pub timestamp: std::time::SystemTime,
744}
745
746impl MockRsync {
747 pub fn new(config: MockRsyncConfig) -> Self {
749 let sync_failures_remaining = AtomicU32::new(config.fail_sync_attempts);
750 let artifacts_failures_remaining = AtomicU32::new(config.fail_artifacts_attempts);
751 Self {
752 sync_invocations: Arc::new(Mutex::new(Vec::new())),
753 sync_failures_remaining,
754 artifacts_failures_remaining,
755 config,
756 }
757 }
758
759 pub fn new_default() -> Self {
761 Self::new(MockRsyncConfig::success())
762 }
763
764 pub fn invocations(&self) -> Vec<MockSyncInvocation> {
766 self.sync_invocations.lock().unwrap().clone()
767 }
768
769 pub async fn sync_to_remote(
771 &self,
772 source: &str,
773 destination: &str,
774 _exclude_patterns: &[String],
775 ) -> anyhow::Result<MockRsyncResult> {
776 log_phase(
777 Phase::Sync,
778 &format!("Mock sync: {} -> {}", source, destination),
779 );
780
781 {
782 let invocation = MockSyncInvocation {
783 source: source.to_string(),
784 destination: destination.to_string(),
785 phase: Phase::Sync,
786 timestamp: std::time::SystemTime::now(),
787 };
788 let mut invocations = self.sync_invocations.lock().unwrap();
789 invocations.push(invocation.clone());
790 global_rsync_invocations().lock().unwrap().push(invocation);
791 }
792
793 if self.config.fail_sync {
794 log_phase(Phase::Sync, "Mock sync failed");
795 return Err(anyhow::anyhow!("Mock: Sync failed"));
796 }
797
798 if self
800 .sync_failures_remaining
801 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
802 current.checked_sub(1)
803 })
804 .is_ok()
805 {
806 log_phase(Phase::Sync, "Mock transient sync failure");
807 return Err(anyhow::anyhow!(
808 "Mock: Sync failed (transient) - Connection timed out"
809 ));
810 }
811
812 let start = Instant::now();
814 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
815
816 let result = MockRsyncResult {
817 files_transferred: self.config.files_per_sync,
818 bytes_transferred: self.config.bytes_per_sync,
819 duration_ms: start.elapsed().as_millis() as u64,
820 };
821
822 log_phase(
823 Phase::Sync,
824 &format!(
825 "Mock sync complete: {} files, {} bytes",
826 result.files_transferred, result.bytes_transferred
827 ),
828 );
829
830 Ok(result)
831 }
832
833 pub async fn retrieve_artifacts(
835 &self,
836 source: &str,
837 destination: &str,
838 _artifact_patterns: &[String],
839 ) -> anyhow::Result<MockRsyncResult> {
840 log_phase(
841 Phase::Artifacts,
842 &format!("Mock artifact retrieval: {} -> {}", source, destination),
843 );
844
845 {
846 let invocation = MockSyncInvocation {
847 source: source.to_string(),
848 destination: destination.to_string(),
849 phase: Phase::Artifacts,
850 timestamp: std::time::SystemTime::now(),
851 };
852 let mut invocations = self.sync_invocations.lock().unwrap();
853 invocations.push(invocation.clone());
854 global_rsync_invocations().lock().unwrap().push(invocation);
855 }
856
857 if self.config.fail_artifacts {
858 log_phase(Phase::Artifacts, "Mock artifact retrieval failed");
859 return Err(anyhow::anyhow!("Mock: Artifact retrieval failed"));
860 }
861
862 if self
864 .artifacts_failures_remaining
865 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
866 current.checked_sub(1)
867 })
868 .is_ok()
869 {
870 log_phase(
871 Phase::Artifacts,
872 "Mock transient artifact retrieval failure",
873 );
874 return Err(anyhow::anyhow!(
875 "Mock: Artifact retrieval failed (transient) - Connection reset by peer"
876 ));
877 }
878
879 let start = Instant::now();
881 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
882
883 let result = MockRsyncResult {
884 files_transferred: self.config.files_per_sync / 2,
885 bytes_transferred: self.config.bytes_per_sync * 2,
886 duration_ms: start.elapsed().as_millis() as u64,
887 };
888
889 log_phase(
890 Phase::Artifacts,
891 &format!(
892 "Mock artifact retrieval complete: {} files, {} bytes",
893 result.files_transferred, result.bytes_transferred
894 ),
895 );
896
897 Ok(result)
898 }
899}
900
901#[cfg(test)]
902mod tests {
903 use super::*;
904
905 #[test]
906 fn test_is_mock_enabled_default() {
907 set_mock_enabled_override(Some(false));
910 assert!(!is_mock_enabled());
911 clear_mock_overrides();
912 }
913
914 #[test]
915 fn test_mock_config_defaults() {
916 let config = MockConfig::default();
917 assert_eq!(config.default_exit_code, 0);
918 assert!(!config.fail_connect);
919 assert!(!config.fail_execute);
920 }
921
922 #[test]
923 fn test_mock_config_connection_failure() {
924 let config = MockConfig::connection_failure();
925 assert!(config.fail_connect);
926 }
927
928 #[test]
929 fn test_mock_config_command_failure() {
930 let config = MockConfig::command_failure(1, "error message");
931 assert_eq!(config.default_exit_code, 1);
932 assert_eq!(config.default_stderr, "error message");
933 }
934
935 #[tokio::test]
936 async fn test_mock_ssh_client_connect() {
937 let worker_config = WorkerConfig {
938 id: WorkerId::new("mock-worker"),
939 host: "mock.host".to_string(),
940 user: "mockuser".to_string(),
941 identity_file: "~/.ssh/mock".to_string(),
942 total_slots: 8,
943 priority: 100,
944 tags: vec![],
945 };
946
947 let mut client = MockSshClient::new_default(worker_config);
948 assert!(!client.is_connected());
949
950 client.connect().await.unwrap();
951 assert!(client.is_connected());
952
953 client.disconnect().await.unwrap();
954 assert!(!client.is_connected());
955 }
956
957 #[tokio::test]
958 async fn test_mock_ssh_client_execute() {
959 let worker_config = WorkerConfig {
960 id: WorkerId::new("mock-worker"),
961 host: "mock.host".to_string(),
962 user: "mockuser".to_string(),
963 identity_file: "~/.ssh/mock".to_string(),
964 total_slots: 8,
965 priority: 100,
966 tags: vec![],
967 };
968
969 let mut client = MockSshClient::new(
970 worker_config,
971 MockConfig::default().with_stdout("test output"),
972 );
973
974 client.connect().await.unwrap();
975
976 let result = client.execute("echo test").await.unwrap();
977 assert_eq!(result.exit_code, 0);
978 assert_eq!(result.stdout, "test output");
979
980 let invocations = client.invocations();
981 assert_eq!(invocations.len(), 2); assert_eq!(invocations[1].command, Some("echo test".to_string()));
983 }
984
985 #[tokio::test]
986 async fn test_mock_ssh_client_connection_failure() {
987 let worker_config = WorkerConfig {
988 id: WorkerId::new("mock-worker"),
989 host: "mock.host".to_string(),
990 user: "mockuser".to_string(),
991 identity_file: "~/.ssh/mock".to_string(),
992 total_slots: 8,
993 priority: 100,
994 tags: vec![],
995 };
996
997 let mut client = MockSshClient::new(worker_config, MockConfig::connection_failure());
998
999 let result = client.connect().await;
1000 assert!(result.is_err());
1001 assert!(!client.is_connected());
1002 }
1003
1004 #[tokio::test]
1005 async fn test_mock_rsync_sync() {
1006 let rsync = MockRsync::new_default();
1007
1008 let result = rsync
1009 .sync_to_remote("/local/path", "user@host:/remote/path", &[])
1010 .await
1011 .unwrap();
1012
1013 assert!(result.files_transferred > 0);
1014 assert!(result.bytes_transferred > 0);
1015
1016 let invocations = rsync.invocations();
1017 assert_eq!(invocations.len(), 1);
1018 assert_eq!(invocations[0].phase, Phase::Sync);
1019 }
1020
1021 #[tokio::test]
1022 async fn test_mock_rsync_failure() {
1023 let rsync = MockRsync::new(MockRsyncConfig::sync_failure());
1024
1025 let result = rsync
1026 .sync_to_remote("/local/path", "user@host:/remote/path", &[])
1027 .await;
1028
1029 assert!(result.is_err());
1030 }
1031
1032 #[test]
1033 fn test_phase_display() {
1034 assert_eq!(format!("{}", Phase::Sync), "SYNC");
1035 assert_eq!(format!("{}", Phase::Execute), "EXEC");
1036 assert_eq!(format!("{}", Phase::Artifacts), "ARTIFACTS");
1037 }
1038
1039 #[test]
1040 fn test_mock_config_toolchain_install_failure() {
1041 let config = MockConfig::toolchain_install_failure();
1042 assert!(config.fail_toolchain_install);
1043 assert!(!config.no_rustup);
1044 }
1045
1046 #[test]
1047 fn test_mock_config_no_rustup() {
1048 let config = MockConfig::no_rustup();
1049 assert!(config.no_rustup);
1050 assert!(!config.fail_toolchain_install);
1051 }
1052
1053 #[tokio::test]
1054 async fn test_mock_ssh_client_no_rustup() {
1055 let worker_config = WorkerConfig {
1056 id: WorkerId::new("mock-worker"),
1057 host: "mock.host".to_string(),
1058 user: "mockuser".to_string(),
1059 identity_file: "~/.ssh/mock".to_string(),
1060 total_slots: 8,
1061 priority: 100,
1062 tags: vec![],
1063 };
1064
1065 let mut client = MockSshClient::new(worker_config, MockConfig::no_rustup());
1066 client.connect().await.unwrap();
1067
1068 let result = client.execute("rustup --version").await.unwrap();
1070 assert_eq!(result.exit_code, 127);
1071 assert!(result.stderr.contains("command not found"));
1072 }
1073
1074 #[tokio::test]
1075 async fn test_mock_ssh_client_toolchain_install_failure() {
1076 let worker_config = WorkerConfig {
1077 id: WorkerId::new("mock-worker"),
1078 host: "mock.host".to_string(),
1079 user: "mockuser".to_string(),
1080 identity_file: "~/.ssh/mock".to_string(),
1081 total_slots: 8,
1082 priority: 100,
1083 tags: vec![],
1084 };
1085
1086 let mut client = MockSshClient::new(worker_config, MockConfig::toolchain_install_failure());
1087 client.connect().await.unwrap();
1088
1089 let result = client
1091 .execute("rustup toolchain install nightly-2024-01-15")
1092 .await
1093 .unwrap();
1094 assert_eq!(result.exit_code, 1);
1095 assert!(result.stderr.contains("is not installed"));
1096
1097 let result = client
1099 .execute("rustup run nightly-2024-01-15 cargo build")
1100 .await
1101 .unwrap();
1102 assert_eq!(result.exit_code, 1);
1103 assert!(result.stderr.contains("is not installed"));
1104 }
1105
1106 #[tokio::test]
1107 async fn test_mock_ssh_client_normal_command_with_toolchain_failure() {
1108 let worker_config = WorkerConfig {
1109 id: WorkerId::new("mock-worker"),
1110 host: "mock.host".to_string(),
1111 user: "mockuser".to_string(),
1112 identity_file: "~/.ssh/mock".to_string(),
1113 total_slots: 8,
1114 priority: 100,
1115 tags: vec![],
1116 };
1117
1118 let mut client = MockSshClient::new(worker_config, MockConfig::toolchain_install_failure());
1119 client.connect().await.unwrap();
1120
1121 let result = client.execute("cargo build").await.unwrap();
1123 assert_eq!(result.exit_code, 0);
1124 }
1125
1126 #[test]
1127 fn test_is_mock_host() {
1128 assert!(is_mock_host("mock://localhost"));
1129 assert!(is_mock_host("mock://worker-1"));
1130 assert!(!is_mock_host("localhost"));
1131 assert!(!is_mock_host("192.168.1.1"));
1132 assert!(!is_mock_host(""));
1133 }
1134
1135 #[test]
1136 fn test_is_mock_worker() {
1137 let mock_worker = WorkerConfig {
1138 id: WorkerId::new("mock-worker"),
1139 host: "mock://localhost".to_string(),
1140 user: "user".to_string(),
1141 identity_file: "~/.ssh/id_rsa".to_string(),
1142 total_slots: 4,
1143 priority: 100,
1144 tags: vec![],
1145 };
1146 assert!(is_mock_worker(&mock_worker));
1147
1148 let real_worker = WorkerConfig {
1149 id: WorkerId::new("real-worker"),
1150 host: "192.168.1.1".to_string(),
1151 user: "user".to_string(),
1152 identity_file: "~/.ssh/id_rsa".to_string(),
1153 total_slots: 4,
1154 priority: 100,
1155 tags: vec![],
1156 };
1157 assert!(!is_mock_worker(&real_worker));
1158 }
1159
1160 #[test]
1161 fn test_mock_config_success() {
1162 let config = MockConfig::success();
1163 assert_eq!(config.default_exit_code, 0);
1164 assert!(!config.fail_connect);
1165 assert!(!config.fail_execute);
1166 assert!(config.default_stdout.is_empty());
1167 assert!(config.default_stderr.is_empty());
1168 }
1169
1170 #[test]
1171 fn test_mock_config_with_command_result() {
1172 let custom_result = CommandResult {
1173 exit_code: 42,
1174 stdout: "custom stdout".to_string(),
1175 stderr: "custom stderr".to_string(),
1176 duration_ms: 100,
1177 };
1178
1179 let config =
1180 MockConfig::success().with_command_result("special_cmd", custom_result.clone());
1181
1182 assert!(config.command_results.contains_key("special_cmd"));
1183 let result = config.command_results.get("special_cmd").unwrap();
1184 assert_eq!(result.exit_code, 42);
1185 assert_eq!(result.stdout, "custom stdout");
1186 }
1187
1188 #[test]
1189 fn test_mock_config_with_stdout() {
1190 let config = MockConfig::default().with_stdout("hello world");
1191 assert_eq!(config.default_stdout, "hello world");
1192 }
1193
1194 #[test]
1195 fn test_mock_rsync_config_success() {
1196 let config = MockRsyncConfig::success();
1197 assert!(!config.fail_sync);
1198 assert!(!config.fail_artifacts);
1199 assert_eq!(config.files_per_sync, 10);
1200 assert_eq!(config.bytes_per_sync, 1024 * 100);
1201 }
1202
1203 #[test]
1204 fn test_mock_rsync_config_default() {
1205 let config = MockRsyncConfig::default();
1206 assert!(!config.fail_sync);
1207 assert!(!config.fail_artifacts);
1208 assert_eq!(config.fail_sync_attempts, 0);
1209 assert_eq!(config.fail_artifacts_attempts, 0);
1210 }
1211
1212 #[test]
1213 fn test_mock_rsync_config_artifact_failure() {
1214 let config = MockRsyncConfig::artifact_failure();
1215 assert!(config.fail_artifacts);
1216 assert!(!config.fail_sync);
1217 }
1218
1219 #[tokio::test]
1220 async fn test_mock_rsync_retrieve_artifacts() {
1221 let rsync = MockRsync::new_default();
1222
1223 let result = rsync
1224 .retrieve_artifacts("user@host:/remote/path", "/local/path", &[])
1225 .await
1226 .unwrap();
1227
1228 assert!(result.files_transferred > 0);
1229 assert!(result.bytes_transferred > 0);
1230
1231 let invocations = rsync.invocations();
1232 assert_eq!(invocations.len(), 1);
1233 assert_eq!(invocations[0].phase, Phase::Artifacts);
1234 }
1235
1236 #[tokio::test]
1237 async fn test_mock_rsync_artifact_failure() {
1238 let rsync = MockRsync::new(MockRsyncConfig::artifact_failure());
1239
1240 let result = rsync
1241 .retrieve_artifacts("user@host:/remote/path", "/local/path", &[])
1242 .await;
1243
1244 assert!(result.is_err());
1245 }
1246
1247 #[test]
1248 fn test_phase_equality() {
1249 assert_eq!(Phase::Sync, Phase::Sync);
1250 assert_eq!(Phase::Execute, Phase::Execute);
1251 assert_ne!(Phase::Sync, Phase::Execute);
1252 assert_ne!(Phase::Artifacts, Phase::Connect);
1253 }
1254
1255 #[test]
1256 fn test_phase_copy() {
1257 let phase = Phase::Disconnect;
1258 let copy = phase; assert_eq!(phase, copy);
1260 }
1261
1262 #[test]
1263 fn test_phase_clone() {
1264 fn assert_clone<T: Clone>() {}
1265 assert_clone::<Phase>();
1266 }
1267
1268 #[test]
1269 fn test_phase_display_all_variants() {
1270 assert_eq!(format!("{}", Phase::Sync), "SYNC");
1271 assert_eq!(format!("{}", Phase::Execute), "EXEC");
1272 assert_eq!(format!("{}", Phase::Artifacts), "ARTIFACTS");
1273 assert_eq!(format!("{}", Phase::Connect), "CONNECT");
1274 assert_eq!(format!("{}", Phase::Disconnect), "DISCONNECT");
1275 }
1276
1277 #[test]
1278 fn test_mock_invocation_debug() {
1279 let invocation = MockInvocation {
1280 worker_id: WorkerId::new("test-worker"),
1281 command: Some("echo hello".to_string()),
1282 phase: Phase::Execute,
1283 timestamp: std::time::SystemTime::now(),
1284 };
1285
1286 let debug = format!("{:?}", invocation);
1287 assert!(debug.contains("MockInvocation"));
1288 assert!(debug.contains("test-worker"));
1289 }
1290
1291 #[test]
1292 fn test_mock_invocation_clone() {
1293 let invocation = MockInvocation {
1294 worker_id: WorkerId::new("worker-1"),
1295 command: None,
1296 phase: Phase::Connect,
1297 timestamp: std::time::SystemTime::now(),
1298 };
1299
1300 let cloned = invocation.clone();
1301 assert_eq!(invocation.phase, cloned.phase);
1302 }
1303
1304 #[test]
1305 fn test_mock_sync_invocation_debug() {
1306 let invocation = MockSyncInvocation {
1307 source: "/local/path".to_string(),
1308 destination: "user@host:/remote".to_string(),
1309 phase: Phase::Sync,
1310 timestamp: std::time::SystemTime::now(),
1311 };
1312
1313 let debug = format!("{:?}", invocation);
1314 assert!(debug.contains("MockSyncInvocation"));
1315 assert!(debug.contains("/local/path"));
1316 }
1317
1318 #[test]
1319 fn test_mock_sync_invocation_clone() {
1320 let invocation = MockSyncInvocation {
1321 source: "src".to_string(),
1322 destination: "dst".to_string(),
1323 phase: Phase::Artifacts,
1324 timestamp: std::time::SystemTime::now(),
1325 };
1326
1327 let cloned = invocation.clone();
1328 assert_eq!(invocation.source, cloned.source);
1329 assert_eq!(invocation.destination, cloned.destination);
1330 assert_eq!(invocation.phase, cloned.phase);
1331 }
1332
1333 #[test]
1334 fn test_mock_rsync_result_debug() {
1335 let result = MockRsyncResult {
1336 files_transferred: 5,
1337 bytes_transferred: 1024,
1338 duration_ms: 50,
1339 };
1340
1341 let debug = format!("{:?}", result);
1342 assert!(debug.contains("MockRsyncResult"));
1343 assert!(debug.contains("1024"));
1344 }
1345
1346 #[test]
1347 fn test_mock_rsync_result_clone() {
1348 let result = MockRsyncResult {
1349 files_transferred: 10,
1350 bytes_transferred: 2048,
1351 duration_ms: 100,
1352 };
1353
1354 let cloned = result.clone();
1355 assert_eq!(result.files_transferred, cloned.files_transferred);
1356 assert_eq!(result.bytes_transferred, cloned.bytes_transferred);
1357 assert_eq!(result.duration_ms, cloned.duration_ms);
1358 }
1359
1360 #[test]
1361 fn test_global_invocations_clear() {
1362 clear_global_invocations();
1364
1365 let ssh = global_ssh_invocations_snapshot();
1366 let rsync = global_rsync_invocations_snapshot();
1367
1368 assert!(ssh.is_empty());
1369 assert!(rsync.is_empty());
1370 }
1371
1372 #[tokio::test]
1373 async fn test_mock_ssh_client_execute_not_connected() {
1374 let worker_config = WorkerConfig {
1375 id: WorkerId::new("mock-worker"),
1376 host: "mock.host".to_string(),
1377 user: "mockuser".to_string(),
1378 identity_file: "~/.ssh/mock".to_string(),
1379 total_slots: 8,
1380 priority: 100,
1381 tags: vec![],
1382 };
1383
1384 let client = MockSshClient::new_default(worker_config);
1385 let result = client.execute("echo test").await;
1388 assert!(result.is_err());
1389 let err_msg = result.unwrap_err().to_string();
1390 assert!(err_msg.contains("Not connected"));
1391 }
1392
1393 #[tokio::test]
1394 async fn test_mock_ssh_client_streaming() {
1395 let worker_config = WorkerConfig {
1396 id: WorkerId::new("mock-worker"),
1397 host: "mock.host".to_string(),
1398 user: "mockuser".to_string(),
1399 identity_file: "~/.ssh/mock".to_string(),
1400 total_slots: 8,
1401 priority: 100,
1402 tags: vec![],
1403 };
1404
1405 let mut client = MockSshClient::new(
1406 worker_config,
1407 MockConfig::default().with_stdout("line1\nline2\nline3"),
1408 );
1409 client.connect().await.unwrap();
1410
1411 let mut stdout_lines = Vec::new();
1412 let mut stderr_lines = Vec::new();
1413
1414 let result = client
1415 .execute_streaming(
1416 "echo test",
1417 |line| stdout_lines.push(line.to_string()),
1418 |line| stderr_lines.push(line.to_string()),
1419 )
1420 .await
1421 .unwrap();
1422
1423 assert_eq!(result.exit_code, 0);
1424 assert_eq!(stdout_lines.len(), 3);
1425 }
1426
1427 #[tokio::test]
1428 async fn test_mock_ssh_client_health_check_success() {
1429 let worker_config = WorkerConfig {
1430 id: WorkerId::new("mock-worker"),
1431 host: "mock.host".to_string(),
1432 user: "mockuser".to_string(),
1433 identity_file: "~/.ssh/mock".to_string(),
1434 total_slots: 8,
1435 priority: 100,
1436 tags: vec![],
1437 };
1438
1439 let mut client = MockSshClient::new_default(worker_config);
1440 client.connect().await.unwrap();
1441
1442 let healthy = client.health_check().await.unwrap();
1443 assert!(healthy);
1444 }
1445
1446 #[tokio::test]
1447 async fn test_mock_ssh_client_health_check_failure() {
1448 let worker_config = WorkerConfig {
1449 id: WorkerId::new("mock-worker"),
1450 host: "mock.host".to_string(),
1451 user: "mockuser".to_string(),
1452 identity_file: "~/.ssh/mock".to_string(),
1453 total_slots: 8,
1454 priority: 100,
1455 tags: vec![],
1456 };
1457
1458 let mut client = MockSshClient::new(
1459 worker_config,
1460 MockConfig::command_failure(1, "health check failed"),
1461 );
1462 client.connect().await.unwrap();
1463
1464 let healthy = client.health_check().await.unwrap();
1465 assert!(!healthy);
1466 }
1467
1468 #[test]
1469 fn test_mock_ssh_client_worker_id() {
1470 let worker_config = WorkerConfig {
1471 id: WorkerId::new("my-worker-id"),
1472 host: "mock.host".to_string(),
1473 user: "mockuser".to_string(),
1474 identity_file: "~/.ssh/mock".to_string(),
1475 total_slots: 8,
1476 priority: 100,
1477 tags: vec![],
1478 };
1479
1480 let client = MockSshClient::new_default(worker_config);
1481 assert_eq!(client.worker_id().as_str(), "my-worker-id");
1482 }
1483
1484 #[test]
1485 fn test_mock_ssh_client_clear_invocations() {
1486 let worker_config = WorkerConfig {
1487 id: WorkerId::new("mock-worker"),
1488 host: "mock.host".to_string(),
1489 user: "mockuser".to_string(),
1490 identity_file: "~/.ssh/mock".to_string(),
1491 total_slots: 8,
1492 priority: 100,
1493 tags: vec![],
1494 };
1495
1496 let client = MockSshClient::new_default(worker_config);
1497 assert!(client.invocations().is_empty());
1499 client.clear_invocations();
1500 assert!(client.invocations().is_empty());
1501 }
1502
1503 #[test]
1504 fn test_mock_config_clone() {
1505 let config = MockConfig::default()
1506 .with_stdout("test")
1507 .with_command_result(
1508 "cmd",
1509 CommandResult {
1510 exit_code: 0,
1511 stdout: "out".to_string(),
1512 stderr: "err".to_string(),
1513 duration_ms: 10,
1514 },
1515 );
1516
1517 let cloned = config.clone();
1518 assert_eq!(config.default_stdout, cloned.default_stdout);
1519 assert_eq!(config.command_results.len(), cloned.command_results.len());
1520 }
1521
1522 #[test]
1523 fn test_mock_config_debug() {
1524 let config = MockConfig::default();
1525 let debug = format!("{:?}", config);
1526 assert!(debug.contains("MockConfig"));
1527 }
1528
1529 #[test]
1530 fn test_mock_rsync_config_debug() {
1531 let config = MockRsyncConfig::default();
1532 let debug = format!("{:?}", config);
1533 assert!(debug.contains("MockRsyncConfig"));
1534 }
1535
1536 #[test]
1537 fn test_mock_rsync_config_clone() {
1538 let config = MockRsyncConfig {
1539 fail_sync: true,
1540 fail_sync_attempts: 3,
1541 fail_artifacts: false,
1542 fail_artifacts_attempts: 0,
1543 files_per_sync: 20,
1544 bytes_per_sync: 5000,
1545 };
1546
1547 let cloned = config.clone();
1548 assert_eq!(config.fail_sync, cloned.fail_sync);
1549 assert_eq!(config.fail_sync_attempts, cloned.fail_sync_attempts);
1550 assert_eq!(config.files_per_sync, cloned.files_per_sync);
1551 }
1552}