1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
use futures_util::stream::StreamExt;
use rs2_stream::error::RetryPolicy;
use rs2_stream::rs2::*;
use std::time::{Duration, Instant};
use tokio::runtime::Runtime;
#[test]
fn test_retry_with_policy_immediate() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
// Create a rs2_stream with errors
let stream = from_iter(vec![Ok(1), Err("error1"), Ok(3), Err("error2"), Ok(5)]);
// Create a retry policy
let policy = RetryPolicy::Immediate { max_retries: 2 };
// Apply the retry policy with a rs2_stream factory
let result = stream
.retry_with_policy_rs2(policy, || {
from_iter(vec![Ok(1), Err("error1"), Ok(3), Err("error2"), Ok(5)])
})
.collect::<Vec<_>>()
.await;
// The retry_with_policy function yields all items from all attempts
// First attempt: Ok(1), Err("error1")
// Second attempt: Ok(1), Err("error1")
// Third attempt: Ok(1), Err("error1")
// Since we hit max_retries, we stop
assert_eq!(
result,
vec![
Ok(1),
Err("error1"), // First attempt
Ok(1),
Err("error1"), // Second attempt
Ok(1),
Err("error1"), // Third attempt
]
);
});
}
#[test]
fn test_retry_with_policy_fixed_delay() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
// Create a rs2_stream with errors
let stream = from_iter(vec![Ok(1), Err("error1"), Ok(3)]);
// Create a retry policy with a fixed delay
let delay = Duration::from_millis(100);
let policy = RetryPolicy::Fixed {
max_retries: 1,
delay,
};
// Measure the time it takes to process the rs2_stream with retries
let start = Instant::now();
let result = stream
.retry_with_policy_rs2(policy, || from_iter(vec![Ok(1), Err("error1"), Ok(3)]))
.collect::<Vec<_>>()
.await;
let elapsed = start.elapsed();
// The retry_with_policy function yields all items from all attempts
// First attempt: Ok(1), Err("error1")
// Second attempt: Ok(1), Err("error1")
assert_eq!(
result,
vec![
Ok(1),
Err("error1"), // First attempt
Ok(1),
Err("error1"), // Second attempt
]
);
// Verify that the delay was applied
assert!(
elapsed.as_millis() >= 100,
"Expected delay of at least 100ms"
);
});
}
#[test]
fn test_map_error() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
// Create a rs2_stream with errors
let stream = from_iter(vec![Ok(1), Err("error1"), Ok(3), Err("error2")]);
// Map errors to a different type
let result = stream
.map_error_rs2(|e| format!("Mapped: {}", e))
.collect::<Vec<_>>()
.await;
assert_eq!(
result,
vec![
Ok(1),
Err("Mapped: error1".to_string()),
Ok(3),
Err("Mapped: error2".to_string()),
]
);
});
}
#[test]
fn test_or_else() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
// Create a rs2_stream with errors
let stream = from_iter(vec![Ok(1), Err("error1"), Ok(3), Err("error2")]);
// Replace errors with fallback values
let result = stream
.or_else_rs2(|e| match e {
"error1" => 42,
"error2" => 43,
_ => 0,
})
.collect::<Vec<_>>()
.await;
assert_eq!(result, vec![1, 42, 3, 43]);
});
}
#[test]
fn test_collect_ok() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
// Create a rs2_stream with errors
let stream = from_iter(vec![Ok(1), Err("error1"), Ok(3), Err("error2"), Ok(5)]);
// Collect only successful values
let result = stream.collect_ok_rs2().collect::<Vec<_>>().await;
// Should contain a single Vec with all successful values
assert_eq!(result, vec![vec![1, 3, 5]]);
});
}
#[test]
fn test_collect_err() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
// Create a rs2_stream with errors
let stream = from_iter(vec![Ok(1), Err("error1"), Ok(3), Err("error2"), Ok(5)]);
// Collect only errors
let result = stream.collect_err_rs2().collect::<Vec<_>>().await;
// Should contain a single Vec with all errors
assert_eq!(result, vec![vec!["error1", "error2"]]);
});
}