1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
#![allow(clippy::disallowed_methods)]
//! Property-based tests for TimeSyncManager concurrent access.
//!
//! These tests verify that the TimeSyncManager maintains consistency
//! under concurrent access from multiple threads.
//!
//! **Feature: code-refactoring-improvements**
//! **Validates: Requirements 5.4**
use ccxt_exchanges::binance::time_sync::{TimeSyncConfig, TimeSyncManager};
use proptest::prelude::*;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
// ============================================================================
// Test Generators
// ============================================================================
/// Strategy for generating server time offsets (in milliseconds).
fn offset_strategy() -> impl Strategy<Value = i64> {
// Generate offsets from -10 seconds to +10 seconds
-10_000i64..10_000i64
}
/// Strategy for generating number of concurrent threads.
fn thread_count_strategy() -> impl Strategy<Value = usize> {
2usize..10
}
/// Strategy for generating number of operations per thread.
fn ops_per_thread_strategy() -> impl Strategy<Value = usize> {
10usize..50
}
/// Strategy for generating sync intervals.
fn sync_interval_strategy() -> impl Strategy<Value = Duration> {
(1u64..120).prop_map(Duration::from_secs)
}
// ============================================================================
// Property Tests: Concurrent Access Consistency
// ============================================================================
proptest! {
#![proptest_config(ProptestConfig::with_cases(100))]
/// **Feature: code-refactoring-improvements, Property: Concurrent access consistency**
/// **Validates: Requirements 5.4**
///
/// *For any* number of concurrent threads calling `get_server_timestamp()`,
/// all calls SHALL complete without panic or data race.
#[test]
fn prop_concurrent_reads_complete_without_panic(
thread_count in thread_count_strategy(),
ops_per_thread in ops_per_thread_strategy(),
initial_offset in offset_strategy()
) {
let manager = Arc::new(TimeSyncManager::new());
// Initialize with an offset
let base_time = ccxt_core::time::TimestampUtils::now_ms();
manager.update_offset(base_time + initial_offset);
let mut handles = vec![];
// Spawn reader threads
for _ in 0..thread_count {
let manager_clone = Arc::clone(&manager);
let handle = thread::spawn(move || {
let mut timestamps = Vec::with_capacity(ops_per_thread);
for _ in 0..ops_per_thread {
let ts = manager_clone.get_server_timestamp();
timestamps.push(ts);
}
timestamps
});
handles.push(handle);
}
// All threads should complete successfully
for handle in handles {
let timestamps = handle.join().expect("Thread should not panic");
// All timestamps should be positive
for ts in timestamps {
prop_assert!(ts > 0, "Timestamp should be positive: {}", ts);
}
}
}
/// **Feature: code-refactoring-improvements, Property: Concurrent writes consistency**
/// **Validates: Requirements 5.4**
///
/// *For any* number of concurrent threads calling `update_offset()`,
/// the manager SHALL remain in a consistent state after all writes complete.
#[test]
fn prop_concurrent_writes_maintain_consistency(
thread_count in thread_count_strategy(),
ops_per_thread in ops_per_thread_strategy()
) {
let manager = Arc::new(TimeSyncManager::new());
let mut handles = vec![];
// Spawn writer threads
for i in 0..thread_count {
let manager_clone = Arc::clone(&manager);
let handle = thread::spawn(move || {
for j in 0..ops_per_thread {
let server_time = ccxt_core::time::TimestampUtils::now_ms()
+ (i * 100 + j) as i64;
manager_clone.update_offset(server_time);
}
});
handles.push(handle);
}
// All threads should complete successfully
for handle in handles {
handle.join().expect("Thread should not panic");
}
// Manager should be in a consistent state
prop_assert!(manager.is_initialized(), "Manager should be initialized after writes");
prop_assert!(manager.last_sync_time() > 0, "Last sync time should be set");
}
/// **Feature: code-refactoring-improvements, Property: Mixed read/write consistency**
/// **Validates: Requirements 5.4**
///
/// *For any* mix of concurrent readers and writers, the manager SHALL
/// maintain consistency without data races.
#[test]
fn prop_concurrent_read_write_consistency(
reader_count in 2usize..8,
writer_count in 1usize..4,
ops_per_thread in ops_per_thread_strategy()
) {
let manager = Arc::new(TimeSyncManager::new());
// Initialize the manager
manager.update_offset(ccxt_core::time::TimestampUtils::now_ms());
let mut handles = vec![];
// Spawn reader threads
for _ in 0..reader_count {
let manager_clone = Arc::clone(&manager);
let handle = thread::spawn(move || {
for _ in 0..ops_per_thread {
let _ = manager_clone.get_server_timestamp();
let _ = manager_clone.get_offset();
let _ = manager_clone.is_initialized();
let _ = manager_clone.needs_resync();
}
});
handles.push(handle);
}
// Spawn writer threads
for i in 0..writer_count {
let manager_clone = Arc::clone(&manager);
let handle = thread::spawn(move || {
for j in 0..ops_per_thread {
let server_time = ccxt_core::time::TimestampUtils::now_ms()
+ (i * 50 + j) as i64;
manager_clone.update_offset(server_time);
}
});
handles.push(handle);
}
// All threads should complete successfully
for handle in handles {
handle.join().expect("Thread should not panic");
}
// Verify final state is consistent
prop_assert!(manager.is_initialized());
let offset = manager.get_offset();
let last_sync = manager.last_sync_time();
let timestamp = manager.get_server_timestamp();
// Timestamp should be approximately last_sync + offset
let expected_approx = last_sync + offset;
let diff = (timestamp - expected_approx).abs();
// Allow some tolerance for timing differences
prop_assert!(
diff < 1000,
"Timestamp {} should be close to expected {} (diff: {})",
timestamp, expected_approx, diff
);
}
/// **Feature: code-refactoring-improvements, Property: Reset during concurrent access**
/// **Validates: Requirements 5.4**
///
/// *For any* concurrent access pattern including reset operations,
/// the manager SHALL not panic or corrupt state.
#[test]
fn prop_reset_during_concurrent_access(
thread_count in 2usize..6,
ops_per_thread in 10usize..30
) {
let manager = Arc::new(TimeSyncManager::new());
manager.update_offset(ccxt_core::time::TimestampUtils::now_ms());
let mut handles = vec![];
// Spawn reader threads
for _ in 0..thread_count {
let manager_clone = Arc::clone(&manager);
let handle = thread::spawn(move || {
for _ in 0..ops_per_thread {
let _ = manager_clone.get_server_timestamp();
let _ = manager_clone.is_initialized();
}
});
handles.push(handle);
}
// Spawn a reset thread
{
let manager_clone = Arc::clone(&manager);
let handle = thread::spawn(move || {
for _ in 0..5 {
manager_clone.reset();
thread::sleep(Duration::from_micros(100));
manager_clone.update_offset(ccxt_core::time::TimestampUtils::now_ms());
}
});
handles.push(handle);
}
// All threads should complete successfully
for handle in handles {
handle.join().expect("Thread should not panic");
}
}
/// **Feature: code-refactoring-improvements, Property: Configuration immutability**
/// **Validates: Requirements 5.4**
///
/// *For any* TimeSyncConfig, the configuration should remain unchanged
/// during concurrent access to the manager.
#[test]
fn prop_config_immutable_during_concurrent_access(
sync_interval in sync_interval_strategy(),
thread_count in thread_count_strategy()
) {
let config = TimeSyncConfig {
sync_interval,
auto_sync: true,
max_offset_drift: 5000,
};
let expected_interval = config.sync_interval;
let manager = Arc::new(TimeSyncManager::with_config(config));
manager.update_offset(ccxt_core::time::TimestampUtils::now_ms());
let mut handles = vec![];
// Spawn threads that read config
for _ in 0..thread_count {
let manager_clone = Arc::clone(&manager);
let expected = expected_interval;
let handle = thread::spawn(move || {
for _ in 0..50 {
let config = manager_clone.config();
assert_eq!(config.sync_interval, expected);
manager_clone.update_offset(ccxt_core::time::TimestampUtils::now_ms());
}
});
handles.push(handle);
}
for handle in handles {
handle.join().expect("Thread should not panic");
}
// Config should still be the same
prop_assert_eq!(manager.config().sync_interval, expected_interval);
}
}
// ============================================================================
// Additional Property Tests
// ============================================================================
proptest! {
#![proptest_config(ProptestConfig::with_cases(100))]
/// *For any* offset value, the offset should be correctly stored and retrieved.
#[test]
fn prop_offset_storage_retrieval(offset in offset_strategy()) {
let manager = TimeSyncManager::new();
let base_time = ccxt_core::time::TimestampUtils::now_ms();
manager.update_offset(base_time + offset);
let stored_offset = manager.get_offset();
// The stored offset should be approximately equal to the input offset
// (with some tolerance for timing)
let diff = (stored_offset - offset).abs();
prop_assert!(
diff < 100,
"Stored offset {} should be close to input offset {} (diff: {})",
stored_offset, offset, diff
);
}
/// *For any* server time, update_offset should mark the manager as initialized.
#[test]
fn prop_update_offset_initializes(server_time_offset in offset_strategy()) {
let manager = TimeSyncManager::new();
prop_assert!(!manager.is_initialized());
let server_time = ccxt_core::time::TimestampUtils::now_ms() + server_time_offset;
manager.update_offset(server_time);
prop_assert!(manager.is_initialized());
}
/// *For any* initialized manager, get_server_timestamp should return a reasonable value.
#[test]
fn prop_server_timestamp_reasonable(offset in offset_strategy()) {
let manager = TimeSyncManager::new();
let base_time = ccxt_core::time::TimestampUtils::now_ms();
manager.update_offset(base_time + offset);
let timestamp = manager.get_server_timestamp();
let current_time = ccxt_core::time::TimestampUtils::now_ms();
// Timestamp should be within a reasonable range of current time + offset
let expected_approx = current_time + offset;
let diff = (timestamp - expected_approx).abs();
prop_assert!(
diff < 1000,
"Timestamp {} should be close to expected {} (diff: {})",
timestamp, expected_approx, diff
);
}
}
#[cfg(test)]
mod unit_tests {
use super::*;
#[test]
fn test_basic_concurrent_access() {
let manager = Arc::new(TimeSyncManager::new());
manager.update_offset(ccxt_core::time::TimestampUtils::now_ms());
let mut handles = vec![];
for _ in 0..4 {
let m = Arc::clone(&manager);
handles.push(thread::spawn(move || {
for _ in 0..100 {
let _ = m.get_server_timestamp();
}
}));
}
for h in handles {
h.join().unwrap();
}
assert!(manager.is_initialized());
}
}