ferriskey 0.3.0

Rust client for Valkey, built for FlowFabric. Forked from glide-core (valkey-glide).
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
use aws_config::BehaviorVersion;
use aws_credential_types::{Credentials, provider::ProvideCredentials};
use aws_sigv4::http_request::{
    SignableBody, SignableRequest, SignatureLocation, SigningSettings, sign,
};
use aws_sigv4::sign::v4;
use rand::Rng;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use std::time::SystemTime;
use strum_macros::IntoStaticStr;
use tokio::sync::{Notify, RwLock};
use tokio::task::JoinHandle;
use tokio::time::{MissedTickBehavior, interval};

use crate::value::{ErrorKind, Error};

/// Maximum refresh interval in seconds (12 hours)
const MAX_REFRESH_INTERVAL_SECONDS: u32 = 12 * 60 * 60; // 43200 seconds
/// Default refresh interval in seconds (5 minutes)
const DEFAULT_REFRESH_INTERVAL_SECONDS: u32 = 300; // 300 seconds (5min)
/// Warning threshold for refresh interval in seconds (15 minutes)
/// Setting refresh intervals above this value may have performance consequences
const WARNING_REFRESH_INTERVAL_SECONDS: u32 = 15 * 60; // 900 seconds
/// SigV4 presign expiration (15 minutes)
pub const TOKEN_TTL_SECONDS: u64 = 15 * 60; // 900

/// Exponential backoff settings for token generation
const TOKEN_GEN_MAX_ATTEMPTS: u32 = 8;
const TOKEN_GEN_INITIAL_BACKOFF_MS: u64 = 100;
/// Safety cap so we never sleep unreasonably long between attempts
const TOKEN_GEN_MAX_BACKOFF_MS: u64 = 3_000;

/// AWS service type for IAM authentication
#[derive(Clone, Copy, Debug, PartialEq, Eq, IntoStaticStr)]
pub enum ServiceType {
    /// Amazon ElastiCache service
    #[strum(serialize = "elasticache")]
    ElastiCache,

    /// Amazon MemoryDB service
    #[strum(serialize = "memorydb")]
    MemoryDB,
}

/// Validate refresh interval (1 second to 12 hours, defaults to 5 minutes)
fn validate_refresh_interval(
    refresh_interval_seconds: Option<u32>,
) -> std::result::Result<Option<u32>, Error> {
    match refresh_interval_seconds {
        Some(0) => {
            // Reject 0 as an invalid interval
            Err(Error::from((
                ErrorKind::ClientError,
                "IAM refresh interval validation failed",
                "interval must be at least 1 second, got 0".to_string(),
            )))
        }
        Some(interval) => {
            if interval > MAX_REFRESH_INTERVAL_SECONDS {
                return Err(Error::from((
                    ErrorKind::ClientError,
                    "IAM refresh interval validation failed",
                    format!("actual={interval} exceeds max={MAX_REFRESH_INTERVAL_SECONDS}"),
                )));
            }

            // Log warning if interval is above 15 minutes
            if interval >= WARNING_REFRESH_INTERVAL_SECONDS {
                let interval_min = interval / 60;
                let warning_min = WARNING_REFRESH_INTERVAL_SECONDS / 60;
                tracing::warn!(
                    "IAM token refresh interval warning - Refresh interval of {interval} seconds ({interval_min}min) exceeds recommended maximum of {WARNING_REFRESH_INTERVAL_SECONDS} seconds ({warning_min}min). \
                    This may increase the risk of token expiration. \
                    Consider using a shorter interval for better reliability."
                );
            }

            Ok(Some(interval))
        }
        None => Ok(Some(DEFAULT_REFRESH_INTERVAL_SECONDS)),
    }
}

/// Get AWS credentials using the default credential chain
async fn get_signing_identity(
    region: &str,
    service_type: ServiceType,
) -> std::result::Result<aws_credential_types::Credentials, Error> {
    let config = aws_config::defaults(BehaviorVersion::latest())
        .region(aws_config::Region::new(region.to_string()))
        .load()
        .await;

    let provider = config.credentials_provider().ok_or_else(|| {
        Error::from((
            ErrorKind::ClientError,
            "IAM credentials error",
            "No AWS credentials provider found".to_string(),
        ))
    })?;

    let creds = provider.provide_credentials().await.map_err(|e| {
        Error::from((
            ErrorKind::ClientError,
            "IAM credentials error",
            e.to_string(),
        ))
    })?;

    let service_name: &'static str = service_type.into();
    Ok(Credentials::new(
        creds.access_key_id(),
        creds.secret_access_key(),
        creds.session_token().map(|s| s.to_string()),
        creds.expiry(),
        service_name,
    ))
}

/// Internal state structure for IAM token management
#[derive(Clone, Debug)]
pub(crate) struct IamTokenState {
    /// AWS region for signing requests
    region: String,
    /// ElastiCache/MemoryDB cluster name
    cluster_name: String,
    /// Username for the connection
    username: String,
    /// Service type (ElastiCache or MemoryDB)
    service_type: ServiceType,
    /// Token refresh interval in seconds
    refresh_interval_seconds: u32,
}

/// IAM-based token manager for ElastiCache/MemoryDB.
///
/// - Tokens: valid 15m, refreshed every 5m by default.
/// - Refresh: periodic, uses exponential backoff with ±20% jitter on failures.
/// - Failures: logged only; cached token stays valid until expiry.
/// - Thread-safe via `Arc<RwLock<...>>` for token cache and `Arc<AtomicBool>` for change notification.
pub struct IAMTokenManager {
    /// Cached auth token, stored in an `Arc<RwLock<String>>` to allow many concurrent readers,
    /// safe exclusive writes on refresh, and shared access across async tasks.
    cached_token: Arc<RwLock<String>>,
    /// Timestamp of when the cached token was last generated/refreshed.
    token_created_at: Arc<RwLock<tokio::time::Instant>>,
    /// IAM token state containing all configuration
    iam_token_state: IamTokenState,
    /// Background refresh task handle
    refresh_task: Option<JoinHandle<()>>,
    /// Shutdown signal for graceful task termination
    shutdown_notify: Arc<Notify>,
    /// Atomic flag to signal when token has changed (for efficient change detection)
    token_changed: Arc<AtomicBool>,
}

/// Custom Debug implementation for IAMTokenManager
impl std::fmt::Debug for IAMTokenManager {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("IAMTokenManager")
            .field("cached_token", &"<RwLock<String>>")
            .field("iam_token_state", &self.iam_token_state)
            .field("refresh_task", &self.refresh_task.is_some())
            .field("shutdown_notify", &"<Notify>")
            .field("token_changed", &self.token_changed.load(Ordering::Relaxed))
            .finish()
    }
}

impl IAMTokenManager {
    /// Create a new IAM token manager
    ///
    /// # Arguments
    /// * `cluster_name` - The ElastiCache/MemoryDB cluster name
    /// * `username` - Username for authentication
    /// * `region` - AWS region of the cluster
    /// * `service_type` - Service type (ElastiCache or MemoryDB)
    /// * `refresh_interval_seconds` - Optional refresh interval in seconds. Defaults to 5 minutes (300 seconds).
    ///   Maximum allowed is 12 hours (43200 seconds). Values above 15 minutes (900 seconds) will log a warning
    ///   about potential performance consequences.
    pub async fn new(
        cluster_name: String,
        username: String,
        region: String,
        service_type: ServiceType,
        refresh_interval_seconds: Option<u32>,
    ) -> std::result::Result<Self, Error> {
        let validated_refresh_interval = validate_refresh_interval(refresh_interval_seconds)?;

        let state = IamTokenState {
            region,
            cluster_name,
            username,
            service_type,
            refresh_interval_seconds: validated_refresh_interval
                .unwrap_or(DEFAULT_REFRESH_INTERVAL_SECONDS),
        };

        // Generate initial token using the state
        let initial_token = Self::generate_token_with_backoff(&state).await?;

        Ok(Self {
            cached_token: Arc::new(RwLock::new(initial_token)),
            token_created_at: Arc::new(RwLock::new(tokio::time::Instant::now())),
            iam_token_state: state,
            refresh_task: None,
            shutdown_notify: Arc::new(Notify::new()),
            token_changed: Arc::new(AtomicBool::new(true)), // Initially true to trigger first AUTH
        })
    }

    /// Start the background token refresh task
    pub fn start_refresh_task(&mut self) {
        if self.refresh_task.is_some() {
            return; // Task already running
        }

        let iam_token_state = self.iam_token_state.clone();
        let cached_token = Arc::clone(&self.cached_token);
        let token_created_at = Arc::clone(&self.token_created_at);
        let shutdown_notify = Arc::clone(&self.shutdown_notify);
        let token_changed = Arc::clone(&self.token_changed);

        let task = tokio::spawn(Self::token_refresh_task(
            iam_token_state,
            cached_token,
            token_created_at,
            shutdown_notify,
            token_changed,
        ));

        self.refresh_task = Some(task);
    }

    /// Background token refresh task implementation
    async fn token_refresh_task(
        iam_token_state: IamTokenState,
        cached_token: Arc<RwLock<String>>,
        token_created_at: Arc<RwLock<tokio::time::Instant>>,
        shutdown_notify: Arc<Notify>,
        token_changed: Arc<AtomicBool>,
    ) {
        let refresh_interval = Duration::from_secs(iam_token_state.refresh_interval_seconds as u64);

        let mut interval_timer = interval(refresh_interval);
        interval_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);

        // Skip the first tick since we already have an initial token
        interval_timer.tick().await;

        loop {
            tokio::select! {
                _ = interval_timer.tick() => {
                    let _ = Self::handle_token_refresh(&iam_token_state, &cached_token, &token_created_at, &token_changed).await;
                }
                _ = shutdown_notify.notified() => {
                    tracing::info!("IAM token refresh task shutting down");
                    break;
                }
            }
        }
    }

    /// Refresh cached token with backoff + jitter.
    /// On success: update token + set atomic flag.
    /// On failure: log error, keep old token, return error.
    async fn handle_token_refresh(
        iam_token_state: &IamTokenState,
        cached_token: &Arc<RwLock<String>>,
        token_created_at: &Arc<RwLock<tokio::time::Instant>>,
        token_changed: &Arc<AtomicBool>,
    ) -> std::result::Result<(), Error> {
        match Self::generate_token_with_backoff(iam_token_state).await {
            Ok(new_token) => {
                Self::set_cached_token_static(cached_token, new_token.clone()).await;
                {
                    let mut ts = token_created_at.write().await;
                    *ts = tokio::time::Instant::now();
                }
                token_changed.store(true, Ordering::Release);
                Ok(())
            }
            Err(err) => {
                // Leave cached token unchanged; logs already emitted in backoff routine
                tracing::error!("IAM token refresh failed - Could not refresh token after backoff: {err}");
                Err(err)
            }
        }
    }

    /// Generate a token with exponential backoff + ±20% jitter.
    /// Retries up to `TOKEN_GEN_MAX_ATTEMPTS`, doubling backoff each time (capped).
    /// Returns token on success, last error on failure.
    pub(crate) async fn generate_token_with_backoff(
        state: &IamTokenState,
    ) -> std::result::Result<String, Error> {
        let mut attempt: u32 = 0;
        let mut backoff_ms = TOKEN_GEN_INITIAL_BACKOFF_MS;

        loop {
            match Self::generate_token_static(state).await {
                Ok(token) => {
                    return Ok(token);
                }
                Err(e) => {
                    attempt += 1;

                    if attempt >= TOKEN_GEN_MAX_ATTEMPTS {
                        tracing::error!("IAM token generation failed - Exhausted {TOKEN_GEN_MAX_ATTEMPTS} attempts with exponential backoff. error: {e}");
                        return Err(e);
                    }

                    // Apply ±20% jitter to the current backoff for the sleep duration,
                    // but advance the base backoff independently to avoid compounding.
                    let sleep_ms = {
                        let jitter = (backoff_ms as f64 * 0.2) as u64;
                        let min = backoff_ms.saturating_sub(jitter);
                        let max = backoff_ms.saturating_add(jitter);
                        let mut rng = rand::rng();
                        rng.random_range(min..=max)
                    };

                    tracing::warn!("IAM token generation failed - {e}. Retrying in {sleep_ms}ms");

                    tokio::time::sleep(Duration::from_millis(sleep_ms)).await;

                    // Double the clean base backoff (no jitter drift)
                    backoff_ms = (backoff_ms.saturating_mul(2)).min(TOKEN_GEN_MAX_BACKOFF_MS);
                }
            }
        }
    }

    /// Force refresh the token immediately
    ///
    /// Returns an error if token generation fails after retries.
    pub async fn refresh_token(&self) -> std::result::Result<(), Error> {
        Self::handle_token_refresh(
            &self.iam_token_state,
            &self.cached_token,
            &self.token_created_at,
            &self.token_changed,
        )
        .await
    }

    /// Stop the background refresh task gracefully
    pub async fn stop_refresh_task(&mut self) {
        if let Some(task) = self.refresh_task.take() {
            self.shutdown_notify.notify_one();
            // Give the task a moment to shut down gracefully
            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
        }
    }

    /// Set a new cached token (static version for use in background tasks)
    async fn set_cached_token_static(cached_token: &Arc<RwLock<String>>, new_token: String) {
        let mut token_guard = cached_token.write().await;
        *token_guard = new_token;
    }

    /// Get the current cached token
    pub async fn get_token(&self) -> String {
        let token_guard = self.cached_token.read().await;
        token_guard.clone()
    }

    /// Check if token has changed since last check
    pub fn token_changed(&self) -> bool {
        self.token_changed.load(Ordering::Acquire)
    }

    /// Clear the token changed flag after handling the change
    pub fn clear_token_changed(&self) {
        self.token_changed.store(false, Ordering::Release)
    }

    /// Create a lightweight handle to the token cache for use by the reconnection path.
    ///
    /// The returned handle shares the same `Arc`s as this manager, so any token
    /// refresh performed by the background task is immediately visible through
    /// the handle without requiring a reference back to the full `IAMTokenManager`.
    pub fn get_token_handle(&self) -> crate::client::IAMTokenHandle {
        crate::client::IAMTokenHandle {
            cached_token: Arc::clone(&self.cached_token),
            token_created_at: Arc::clone(&self.token_created_at),
            iam_token_state: self.iam_token_state.clone(),
        }
    }

    /// Generate IAM authentication token using SigV4 signing (valid for 15 minutes)
    async fn generate_token_static(state: &IamTokenState) -> std::result::Result<String, Error> {
        let service_name: &'static str = state.service_type.into();
        let signing_time = SystemTime::now();
        let hostname = state.cluster_name.clone();
        let base_url = build_base_url(&hostname, &state.username);

        // Fetch fresh credentials on every token generation to handle credential rotation
        // (e.g., EC2 instance profile credentials rotate every ~6 hours)
        let creds = get_signing_identity(&state.region, state.service_type).await?;
        let identity_value = creds.into();

        let mut signing_settings = SigningSettings::default();
        signing_settings.signature_location = SignatureLocation::QueryParams;
        signing_settings.expires_in = Some(Duration::from_secs(TOKEN_TTL_SECONDS));

        let signing_params = v4::SigningParams::builder()
            .identity(&identity_value)
            .region(&state.region)
            .name(service_name)
            .time(signing_time)
            .settings(signing_settings)
            .build()
            .map_err(|e| {
                Error::from((
                    ErrorKind::ClientError,
                    "IAM token generation failed",
                    format!("Failed to build signing params: {e}"),
                ))
            })?
            .into();

        // Create signable request with the simple hostname
        let signable_request = SignableRequest::new(
            "GET",
            &base_url,
            std::iter::empty(),
            SignableBody::Bytes(b""),
        )
        .map_err(|e| {
            Error::from((
                ErrorKind::ClientError,
                "IAM token generation failed",
                format!("Failed to create signable request: {e}"),
            ))
        })?;

        // Sign the request (with presigning settings, this will generate query parameters)
        let (instructions, _sig) = sign(signable_request, &signing_params)
            .map_err(|e| {
                Error::from((
                    ErrorKind::ClientError,
                    "IAM token generation failed",
                    format!("Failed to sign: {e}"),
                ))
            })?
            .into_parts();

        // Build a temporary HTTP request to apply the signing instructions
        let mut req = http::Request::builder()
            .method("GET")
            .uri(&base_url)
            .header("host", &hostname)
            .body(())
            .map_err(|e| {
                Error::from((
                    ErrorKind::ClientError,
                    "IAM token generation failed",
                    format!("Build HTTP request failed: {e}"),
                ))
            })?;

        instructions.apply_to_request_http1x(&mut req);

        // Extract the token from the signed request URI
        let token = strip_scheme(req.uri().to_string());
        tracing::debug!("Generated new IAM token");
        Ok(token)
    }
}

impl Drop for IAMTokenManager {
    fn drop(&mut self) {
        // Signal shutdown to the background task
        self.shutdown_notify.notify_one();

        // Note: We can't await in Drop, so the task cleanup happens in stop_refresh_task()
        // or will be handled by the tokio runtime when the JoinHandle is dropped
    }
}

/// Build the presign base URL for the target host and user.
fn build_base_url(hostname: &str, username: &str) -> String {
    format!(
        "https://{}/?Action=connect&User={}",
        hostname,
        urlencoding::encode(username)
    )
}

/// Remove `http://` or `https://` scheme from a URL string.
fn strip_scheme(full: String) -> String {
    full.strip_prefix("https://")
        .or_else(|| full.strip_prefix("http://"))
        .unwrap_or(&full)
        .to_string()
}

#[cfg(test)]
mod tests {
    use super::*;
    use serial_test::serial;
    use std::env;
    use std::fs;
    use std::sync::Once;
    use tokio::time::{Duration, sleep};

    const IAM_TOKENS_JSON: &str = "/tmp/iam_tokens.json";

    // This ensures the file is deleted once before all tests
    static INIT: Once = Once::new();

    fn initialize_test_environment() {
        INIT.call_once(|| {
            let _ = std::fs::remove_file(IAM_TOKENS_JSON);
            tracing::info!("Test setup - Cleaned up old IAM token log file");
        });
    }

    /// Helper function to set up mock AWS credentials for testing
    fn setup_test_credentials() {
        unsafe {
            env::set_var("AWS_ACCESS_KEY_ID", "test_access_key");
            env::set_var("AWS_SECRET_ACCESS_KEY", "test_secret_key");
            env::set_var("AWS_SESSION_TOKEN", "test_session_token");
        }
    }

    /// Helper function to save token to JSON file for inspection
    fn save_token_to_file(test_name: &str, token: &str, state: &IamTokenState) {
        let token_data = serde_json::json!({
            "test_name": test_name,
            "token": token,
            "region": state.region,
            "cluster_name": state.cluster_name,
            "username": state.username,
            "service_type": format!("{:?}", state.service_type),
            "refresh_interval_seconds": state.refresh_interval_seconds,
            "timestamp": std::time::SystemTime::now()
                .duration_since(std::time::UNIX_EPOCH)
                .unwrap()
                .as_secs()
        });

        // Read existing content or create new array
        let mut tokens = if let Ok(content) = fs::read_to_string(IAM_TOKENS_JSON) {
            serde_json::from_str::<Vec<serde_json::Value>>(&content).unwrap_or_else(|_| Vec::new())
        } else {
            Vec::new()
        };

        tokens.push(token_data);

        // Write back to file
        if let Ok(json_string) = serde_json::to_string_pretty(&tokens) {
            let _ = fs::write(IAM_TOKENS_JSON, json_string);
        }
    }

    /// Helper function to create IAMTokenState for testing
    fn create_test_state(
        region: &str,
        cluster_name: &str,
        username: &str,
        service_type: ServiceType,
    ) -> IamTokenState {
        IamTokenState {
            region: region.to_string(),
            cluster_name: cluster_name.to_string(),
            username: username.to_string(),
            service_type,
            refresh_interval_seconds: DEFAULT_REFRESH_INTERVAL_SECONDS,
        }
    }

    #[tokio::test]
    #[serial]
    async fn test_iam_token_manager_with_atomic_flag() {
        initialize_test_environment();
        setup_test_credentials();

        let cluster_name = "test-cluster".to_string();
        let username = "test-user".to_string();
        let region = "us-east-1".to_string();

        // Create IAM token manager
        let mut manager = IAMTokenManager::new(
            cluster_name,
            username,
            region,
            ServiceType::ElastiCache,
            Some(2), // 2 second refresh interval for fast testing
        )
        .await
        .unwrap();

        // Initially, token_changed should be true (to trigger first AUTH)
        assert!(
            manager.token_changed(),
            "Initial token_changed should be true"
        );

        // Clear the flag
        manager.clear_token_changed();
        assert!(
            !manager.token_changed(),
            "After clear, token_changed should be false"
        );

        // Start the refresh task
        manager.start_refresh_task();

        // Wait for a refresh cycle
        sleep(Duration::from_secs(3)).await;

        // After refresh, flag should be true again
        assert!(
            manager.token_changed(),
            "After refresh, token_changed should be true"
        );

        // Stop the refresh task
        manager.stop_refresh_task().await;

        tracing::info!("Test completed successfully! - Atomic flag working as expected");
    }

    #[tokio::test]
    #[serial]
    async fn test_iam_token_manager_manual_refresh_sets_flag() {
        initialize_test_environment();
        setup_test_credentials();

        let cluster_name = "test-cluster".to_string();
        let username = "test-user".to_string();
        let region = "us-east-1".to_string();

        // Create IAM token manager
        let manager = IAMTokenManager::new(
            cluster_name,
            username,
            region,
            ServiceType::ElastiCache,
            None,
        )
        .await
        .unwrap();

        // Clear the initial flag
        manager.clear_token_changed();
        assert!(!manager.token_changed(), "Flag should be false after clear");

        // Manually refresh the token
        manager
            .refresh_token()
            .await
            .expect("refresh_token should succeed in test");

        // Verify that the flag was set
        assert!(
            manager.token_changed(),
            "Flag should be true after manual refresh"
        );

        tracing::info!("Manual refresh test completed successfully!");
    }

    #[tokio::test]
    #[serial]
    async fn test_iam_token_manager_new_creates_initial_token() {
        initialize_test_environment();
        setup_test_credentials();

        let cluster_name = "test-cluster".to_string();
        let username = "test-user".to_string();
        let region = "us-east-1".to_string();

        let result = IAMTokenManager::new(
            cluster_name.clone(),
            username.clone(),
            region.clone(),
            ServiceType::ElastiCache,
            None,
        )
        .await;

        assert!(result.is_ok(), "IAMTokenManager creation should succeed");

        let manager = result.unwrap();
        let token = manager.get_token().await;

        // Save token to JSON file for inspection
        let state = create_test_state(&region, &cluster_name, &username, ServiceType::ElastiCache);
        save_token_to_file(
            "test_iam_token_manager_new_creates_initial_token",
            &token,
            &state,
        );

        assert!(!token.is_empty(), "Initial token should not be empty");
        assert!(
            token.starts_with(&format!("{}/", cluster_name)),
            "Token should start with cluster name"
        );
    }

    #[tokio::test]
    #[serial]
    async fn test_iam_token_manager_get_token_returns_cached_token() {
        initialize_test_environment();
        setup_test_credentials();

        let cluster_name = "test-cluster".to_string();
        let username = "test-user".to_string();
        let region = "us-east-1".to_string();

        let manager = IAMTokenManager::new(
            cluster_name,
            username,
            region,
            ServiceType::ElastiCache,
            None,
        )
        .await
        .unwrap();

        let token1 = manager.get_token().await;
        let token2 = manager.get_token().await;

        assert_eq!(
            token1, token2,
            "get_token should return the same cached token"
        );
    }

    #[tokio::test]
    #[serial]
    async fn test_iam_token_manager_refresh_token_updates_cached_token() {
        initialize_test_environment();
        setup_test_credentials();

        let cluster_name = "test-cluster".to_string();
        let username = "test-user".to_string();
        let region = "us-east-1".to_string();

        let manager = IAMTokenManager::new(
            cluster_name.clone(),
            username.clone(),
            region.clone(),
            ServiceType::ElastiCache,
            None,
        )
        .await
        .unwrap();

        let initial_token = manager.get_token().await;

        // Save initial token to JSON file for inspection
        let state = create_test_state(&region, &cluster_name, &username, ServiceType::ElastiCache);
        save_token_to_file(
            "test_iam_token_manager_refresh_token_updates_cached_token_initial",
            &initial_token,
            &state,
        );

        // Wait at least 1 second to ensure timestamp difference in AWS SigV4 signing
        sleep(Duration::from_secs(1)).await;

        manager
            .refresh_token()
            .await
            .expect("refresh_token should succeed in test");

        let new_token = manager.get_token().await;

        // Save refreshed token to JSON file for inspection
        let state = create_test_state(&region, &cluster_name, &username, ServiceType::ElastiCache);
        save_token_to_file(
            "test_iam_token_manager_refresh_token_updates_cached_token_refreshed",
            &new_token,
            &state,
        );

        // Tokens should be different due to different timestamps in signing
        assert_ne!(
            initial_token, new_token,
            "Refreshed token should be different from initial token"
        );
        assert!(
            new_token.starts_with(&format!("{}/", cluster_name)),
            "New token should still start with cluster name"
        );
    }

    #[tokio::test]
    #[serial]
    async fn test_iam_token_manager_start_and_stop_refresh_task() {
        initialize_test_environment();
        setup_test_credentials();

        let cluster_name = "test-cluster".to_string();
        let username = "test-user".to_string();
        let region = "us-east-1".to_string();

        let mut manager = IAMTokenManager::new(
            cluster_name,
            username,
            region,
            ServiceType::ElastiCache,
            Some(1), // 1 minute refresh interval for faster testing
        )
        .await
        .unwrap();

        // Start the refresh task
        manager.start_refresh_task();
        assert!(
            manager.refresh_task.is_some(),
            "Refresh task should be started"
        );

        // Starting again should not create a new task
        manager.start_refresh_task();
        assert!(
            manager.refresh_task.is_some(),
            "Refresh task should still exist"
        );

        // Stop the refresh task
        manager.stop_refresh_task().await;
        assert!(
            manager.refresh_task.is_none(),
            "Refresh task should be stopped"
        );
    }

    #[tokio::test]
    #[serial]
    async fn test_iam_token_manager_refresh_interval_validation() {
        initialize_test_environment();
        setup_test_credentials();

        let cluster_name = "test-cluster".to_string();
        let username = "test-user".to_string();
        let region = "us-east-1".to_string();

        // Test valid refresh intervals in seconds
        let valid_intervals = [60, 900, 21600, 43200]; // 1 minute, 15 minutes, 6 hours, 12 hours (max)
        for interval in valid_intervals {
            let result = IAMTokenManager::new(
                cluster_name.clone(),
                username.clone(),
                region.clone(),
                ServiceType::ElastiCache,
                Some(interval),
            )
            .await;

            assert!(
                result.is_ok(),
                "IAMTokenManager creation should succeed with valid interval: {interval} seconds"
            );
        }

        // Test invalid refresh interval: 0 (below minimum)
        {
            let result = IAMTokenManager::new(
                cluster_name.clone(),
                username.clone(),
                region.clone(),
                ServiceType::ElastiCache,
                Some(0),
            )
            .await;

            assert!(
                result.is_err(),
                "IAMTokenManager creation should fail with interval 0"
            );
            let error = result.unwrap_err();
            assert_eq!(error.kind(), ErrorKind::ClientError);
            let detail = error.detail().unwrap_or_default();
            assert!(
                detail.contains("0"),
                "Expected '0' in error detail, got: {detail}"
            );
        }

        // Test invalid refresh intervals (strictly greater than 43200 seconds / 12 hours)
        let invalid_intervals = [43201, 86400, 172800]; // just over 12 hours, 24 hours, 48 hours
        for interval in invalid_intervals {
            let result = IAMTokenManager::new(
                cluster_name.clone(),
                username.clone(),
                region.clone(),
                ServiceType::ElastiCache,
                Some(interval),
            )
            .await;

            assert!(
                result.is_err(),
                "IAMTokenManager creation should fail with invalid interval: {interval} seconds"
            );

            let error = result.unwrap_err();
            assert_eq!(error.kind(), ErrorKind::ClientError);
            let detail = error.detail().unwrap_or_default();
            assert!(
                detail.contains(&format!("{interval}")),
                "Expected interval value in error detail, got: {detail}"
            );
            assert!(
                detail.contains(&format!("{MAX_REFRESH_INTERVAL_SECONDS}")),
                "Expected max interval value in error detail, got: {detail}"
            );
        }
    }

    #[tokio::test]
    #[serial]
    async fn test_iam_token_manager_generates_new_token_every_x_seconds() {
        initialize_test_environment();
        setup_test_credentials();

        // Configurable refresh time constant (can be changed)
        const REFRESH_TIME_SECONDS: u32 = 2;

        let cluster_name = "test-cluster".to_string();
        let username = "test-user".to_string();
        let region = "us-east-1".to_string();

        // Create IAMTokenManager with 2-second refresh interval
        let mut manager = IAMTokenManager::new(
            cluster_name.clone(),
            username.clone(),
            region.clone(),
            ServiceType::ElastiCache,
            Some(REFRESH_TIME_SECONDS),
        )
        .await
        .unwrap();

        // Get initial token
        let initial_token = manager.get_token().await;
        assert!(
            !initial_token.is_empty(),
            "Initial token should not be empty"
        );

        // Save initial token to JSON file for inspection
        let state = create_test_state(&region, &cluster_name, &username, ServiceType::ElastiCache);
        save_token_to_file(
            "test_iam_token_manager_generates_new_token_every_5_seconds_initial",
            &initial_token,
            &state,
        );

        // Start the refresh task
        manager.start_refresh_task();

        // Wait for first refresh (5 seconds + small buffer)
        sleep(Duration::from_secs(REFRESH_TIME_SECONDS as u64 + 1)).await;

        let first_refresh_token = manager.get_token().await;
        assert_ne!(
            initial_token, first_refresh_token,
            "Token should be different after first refresh interval"
        );

        // Save first refreshed token to JSON file for inspection
        save_token_to_file(
            "test_iam_token_manager_generates_new_token_every_5_seconds_first_refresh",
            &first_refresh_token,
            &state,
        );

        // Wait for second refresh (another 5 seconds + small buffer)
        sleep(Duration::from_secs(REFRESH_TIME_SECONDS as u64 + 1)).await;

        let second_refresh_token = manager.get_token().await;
        assert_ne!(
            first_refresh_token, second_refresh_token,
            "Token should be different after second refresh interval"
        );
        assert_ne!(
            initial_token, second_refresh_token,
            "Second refresh token should be different from initial token"
        );

        // Save second refreshed token to JSON file for inspection
        save_token_to_file(
            "test_iam_token_manager_generates_new_token_every_5_seconds_second_refresh",
            &second_refresh_token,
            &state,
        );

        // Verify all tokens have the correct format
        for (name, token) in [
            ("initial", &initial_token),
            ("first_refresh", &first_refresh_token),
            ("second_refresh", &second_refresh_token),
        ] {
            assert!(
                token.starts_with(&format!("{}/", cluster_name)),
                "{name} token should start with cluster name"
            );
            assert!(
                token.contains("Action=connect"),
                "{name} token should contain Action=connect"
            );
            assert!(
                token.contains("X-Amz-Expires=900"),
                "{name} token should contain 15-minute expiration"
            );
            assert!(
                token.contains("X-Amz-Signature="),
                "{name} token should contain X-Amz-Signature parameter"
            );
        }

        // Stop the refresh task
    }
}