1use std::time::{SystemTime, UNIX_EPOCH};
2
3use serde::{Deserialize, Serialize};
4
5use crate::asupersync::{
6 codec::ArtifactCodec,
7 config::AsupersyncConfig,
8 error::AsupersyncError,
9 integrity::{IntegrityProof, IntegrityVerifier},
10 transport::{TransferStatus, TransportLayer},
11};
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
14#[serde(rename_all = "snake_case")]
15pub enum RecoveryOutcome {
16 Recovered,
17 RetryScheduled,
18 Rejected,
19}
20
21#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
22pub struct RecoveryPlan {
23 pub artifact_id: String,
24 pub max_attempts: u32,
25 pub deadline_unix_ms: u64,
26}
27
28#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
29pub struct RecoveryReport {
30 pub artifact_id: String,
31 pub attempts: u32,
32 pub outcome: RecoveryOutcome,
33 pub transfer_status: TransferStatus,
34 pub integrity: Option<IntegrityProof>,
35}
36
37pub trait RecoveryPolicy {
38 fn should_retry(&self, attempt: u32, max_attempts: u32) -> bool;
39
40 fn classify(
41 &self,
42 transfer_status: TransferStatus,
43 attempt: u32,
44 max_attempts: u32,
45 ) -> RecoveryOutcome {
46 match transfer_status {
47 TransferStatus::Completed => RecoveryOutcome::Recovered,
48 TransferStatus::RetryableFailure => {
49 if self.should_retry(attempt, max_attempts) {
50 RecoveryOutcome::RetryScheduled
51 } else {
52 RecoveryOutcome::Rejected
53 }
54 }
55 TransferStatus::PermanentFailure => RecoveryOutcome::Rejected,
56 }
57 }
58}
59
60#[derive(Debug, Clone, Copy, Default)]
61pub struct ConservativeRecoveryPolicy;
62
63impl RecoveryPolicy for ConservativeRecoveryPolicy {
64 fn should_retry(&self, attempt: u32, max_attempts: u32) -> bool {
65 attempt < max_attempts
66 }
67}
68
69fn current_unix_ms() -> u64 {
70 let millis = SystemTime::now()
71 .duration_since(UNIX_EPOCH)
72 .map_or(0, |duration| duration.as_millis());
73 u64::try_from(millis).unwrap_or(u64::MAX)
74}
75
76fn recovery_deadline_expired(deadline_unix_ms: u64) -> bool {
77 deadline_unix_ms != 0 && current_unix_ms() > deadline_unix_ms
78}
79
80pub fn recover_once<C, T, V, P>(
81 codec: &C,
82 transport: &T,
83 verifier: &V,
84 policy: &P,
85 config: &AsupersyncConfig,
86 plan: &RecoveryPlan,
87 expected_digest: &str,
88) -> Result<RecoveryReport, AsupersyncError>
89where
90 C: ArtifactCodec,
91 T: TransportLayer,
92 V: IntegrityVerifier,
93 P: RecoveryPolicy,
94{
95 if plan.max_attempts == 0 {
96 return Err(AsupersyncError::Configuration(
97 "max_attempts must be greater than zero",
98 ));
99 }
100
101 let mut attempts = 0_u32;
107 let should_retry = |attempt: u32| {
108 attempt < plan.max_attempts && policy.should_retry(attempt, plan.max_attempts)
109 };
110 loop {
111 if recovery_deadline_expired(plan.deadline_unix_ms) {
112 return Err(AsupersyncError::RecoveryExhausted {
113 artifact_id: plan.artifact_id.clone(),
114 attempts,
115 });
116 }
117
118 attempts = attempts.saturating_add(1);
119
120 let encoded = match transport.receive(&plan.artifact_id, config) {
121 Ok(encoded) => encoded,
122 Err(_) => {
123 if should_retry(attempts) {
124 continue;
125 }
126 return Err(AsupersyncError::RecoveryExhausted {
127 artifact_id: plan.artifact_id.clone(),
128 attempts,
129 });
130 }
131 };
132
133 let payload = match codec.decode(&encoded, config) {
134 Ok(p) => p,
135 Err(_) => {
136 if should_retry(attempts) {
137 continue;
138 }
139 return Err(AsupersyncError::RecoveryExhausted {
140 artifact_id: plan.artifact_id.clone(),
141 attempts,
142 });
143 }
144 };
145 match verifier.verify(&plan.artifact_id, &payload.bytes, expected_digest) {
146 Ok(integrity) => {
147 return Ok(RecoveryReport {
148 artifact_id: plan.artifact_id.clone(),
149 attempts,
150 outcome: RecoveryOutcome::Recovered,
151 transfer_status: TransferStatus::Completed,
152 integrity: Some(integrity),
153 });
154 }
155 Err(_) => {
156 if should_retry(attempts) {
157 continue;
158 }
159 return Err(AsupersyncError::RecoveryExhausted {
160 artifact_id: plan.artifact_id.clone(),
161 attempts,
162 });
163 }
164 }
165 }
166}
167
168#[cfg(test)]
169mod test_recover_once_bounded_loop_bc6fa4 {
170 use std::cell::Cell;
171
172 use super::*;
173 use crate::asupersync::{
174 codec::{ArtifactPayload, EncodedArtifact, PassthroughCodec},
175 config::{AsupersyncConfig, CapabilitySet, CxCapability},
176 integrity::{Fnv1aVerifier, IntegrityProof},
177 transport::InMemoryTransport,
178 };
179
180 struct AlwaysRetryPolicy;
184
185 impl RecoveryPolicy for AlwaysRetryPolicy {
186 fn should_retry(&self, _attempt: u32, _max_attempts: u32) -> bool {
187 true
188 }
189 }
190
191 struct AlwaysFailingTransport;
193
194 impl TransportLayer for AlwaysFailingTransport {
195 fn send(
196 &self,
197 _artifact: EncodedArtifact,
198 _config: &AsupersyncConfig,
199 ) -> Result<crate::asupersync::transport::TransferReport, AsupersyncError> {
200 Err(AsupersyncError::Transport(
201 "always-failing send".to_string(),
202 ))
203 }
204 fn receive(
205 &self,
206 _artifact_id: &str,
207 _config: &AsupersyncConfig,
208 ) -> Result<EncodedArtifact, AsupersyncError> {
209 Err(AsupersyncError::Transport(
210 "always-failing receive".to_string(),
211 ))
212 }
213 fn required_capabilities(&self) -> CapabilitySet {
214 CapabilitySet::for_capability(CxCapability::Io)
215 }
216 }
217
218 struct CountingFailingTransport {
219 receive_calls: Cell<u32>,
220 }
221
222 impl CountingFailingTransport {
223 fn new() -> Self {
224 Self {
225 receive_calls: Cell::new(0),
226 }
227 }
228 }
229
230 impl TransportLayer for CountingFailingTransport {
231 fn send(
232 &self,
233 _artifact: EncodedArtifact,
234 _config: &AsupersyncConfig,
235 ) -> Result<crate::asupersync::transport::TransferReport, AsupersyncError> {
236 Err(AsupersyncError::Transport(
237 "counting-failing send".to_string(),
238 ))
239 }
240 fn receive(
241 &self,
242 _artifact_id: &str,
243 _config: &AsupersyncConfig,
244 ) -> Result<EncodedArtifact, AsupersyncError> {
245 self.receive_calls
246 .set(self.receive_calls.get().saturating_add(1));
247 Err(AsupersyncError::Transport(
248 "counting-failing receive".to_string(),
249 ))
250 }
251 fn required_capabilities(&self) -> CapabilitySet {
252 CapabilitySet::for_capability(CxCapability::Io)
253 }
254 }
255
256 struct FailingVerifier;
257
258 impl IntegrityVerifier for FailingVerifier {
259 fn verify(
260 &self,
261 _artifact_id: &str,
262 _bytes: &[u8],
263 _expected_digest: &str,
264 ) -> Result<IntegrityProof, AsupersyncError> {
265 Err(AsupersyncError::IntegrityMismatch {
266 artifact_id: "failing-verifier".to_string(),
267 expected: "x".to_string(),
268 observed: "y".to_string(),
269 })
270 }
271 }
272
273 fn fnv1a_hex_for_test(bytes: &[u8]) -> String {
274 let mut hash = 0xcbf29ce484222325_u64;
275 for byte in bytes {
276 hash ^= u64::from(*byte);
277 hash = hash.wrapping_mul(0x100000001b3);
278 }
279 format!("{hash:016x}")
280 }
281
282 #[test]
283 fn recover_once_terminates_at_max_attempts_under_buggy_policy() {
284 let plan = RecoveryPlan {
286 artifact_id: "test-artifact".to_string(),
287 max_attempts: 5,
288 deadline_unix_ms: 0,
289 };
290 let config = AsupersyncConfig::default()
291 .with_capabilities(CapabilitySet::for_capability(CxCapability::Io));
292 let result = recover_once(
293 &PassthroughCodec,
294 &AlwaysFailingTransport,
295 &FailingVerifier,
296 &AlwaysRetryPolicy,
297 &config,
298 &plan,
299 "any-digest",
300 );
301 assert!(
302 matches!(
303 &result,
304 Err(AsupersyncError::RecoveryExhausted {
305 artifact_id,
306 attempts,
307 }) if artifact_id == "test-artifact" && *attempts == 5
308 ),
309 "expected RecoveryExhausted after exactly 5 attempts, got {result:?}"
310 );
311 }
312
313 #[test]
314 fn recover_once_zero_max_attempts_returns_configuration_error() {
315 let plan = RecoveryPlan {
316 artifact_id: "test".to_string(),
317 max_attempts: 0,
318 deadline_unix_ms: 0,
319 };
320 let config = AsupersyncConfig::default()
321 .with_capabilities(CapabilitySet::for_capability(CxCapability::Io));
322 let result = recover_once(
323 &PassthroughCodec,
324 &AlwaysFailingTransport,
325 &FailingVerifier,
326 &AlwaysRetryPolicy,
327 &config,
328 &plan,
329 "x",
330 );
331 assert!(matches!(result, Err(AsupersyncError::Configuration(_))));
332 }
333
334 #[test]
335 fn recover_once_expired_deadline_makes_zero_transport_attempts_ehn2c() {
336 let transport = CountingFailingTransport::new();
337 let plan = RecoveryPlan {
338 artifact_id: "expired-artifact".to_string(),
339 max_attempts: 3,
340 deadline_unix_ms: 1,
341 };
342 let config = AsupersyncConfig::default()
343 .with_capabilities(CapabilitySet::for_capability(CxCapability::Io));
344
345 let result = recover_once(
346 &PassthroughCodec,
347 &transport,
348 &FailingVerifier,
349 &AlwaysRetryPolicy,
350 &config,
351 &plan,
352 "any-digest",
353 );
354
355 assert!(
356 matches!(
357 &result,
358 Err(AsupersyncError::RecoveryExhausted {
359 artifact_id,
360 attempts,
361 }) if artifact_id == "expired-artifact" && *attempts == 0
362 ),
363 "expected deadline RecoveryExhausted, got {result:?}"
364 );
365 assert_eq!(
366 transport.receive_calls.get(),
367 0,
368 "expired recovery plans must fail before transport.receive"
369 );
370 }
371
372 #[test]
373 fn recover_once_future_deadline_preserves_retry_budget_ehn2c() {
374 let transport = CountingFailingTransport::new();
375 let plan = RecoveryPlan {
376 artifact_id: "future-artifact".to_string(),
377 max_attempts: 2,
378 deadline_unix_ms: current_unix_ms().saturating_add(60_000),
379 };
380 let config = AsupersyncConfig::default()
381 .with_capabilities(CapabilitySet::for_capability(CxCapability::Io));
382
383 let result = recover_once(
384 &PassthroughCodec,
385 &transport,
386 &FailingVerifier,
387 &AlwaysRetryPolicy,
388 &config,
389 &plan,
390 "any-digest",
391 );
392
393 assert!(
394 matches!(
395 &result,
396 Err(AsupersyncError::RecoveryExhausted {
397 artifact_id,
398 attempts,
399 }) if artifact_id == "future-artifact" && *attempts == 2
400 ),
401 "expected retry-budget RecoveryExhausted, got {result:?}"
402 );
403 assert_eq!(transport.receive_calls.get(), 2);
404 }
405
406 #[test]
407 fn recover_once_round_trips_real_codec_transport_and_verifier_2ryvf()
408 -> Result<(), AsupersyncError> {
409 let codec = PassthroughCodec;
410 let transport = InMemoryTransport::new();
411 let verifier = Fnv1aVerifier;
412 let config = AsupersyncConfig::default().with_capabilities(
413 CapabilitySet::for_capability(CxCapability::Io)
414 .union(CapabilitySet::for_capability(CxCapability::Remote)),
415 );
416 let bytes = b"recoverable artifact payload".to_vec();
417 let expected_digest = fnv1a_hex_for_test(&bytes);
418 let payload = ArtifactPayload {
419 artifact_id: "recoverable-artifact".to_string(),
420 bytes,
421 expected_digest: Some(expected_digest.clone()),
422 };
423 let encoded = codec.encode(&payload, &config)?;
424 transport.send(encoded, &config)?;
425
426 let plan = RecoveryPlan {
427 artifact_id: payload.artifact_id.clone(),
428 max_attempts: 3,
429 deadline_unix_ms: 0,
430 };
431 let report = recover_once(
432 &codec,
433 &transport,
434 &verifier,
435 &ConservativeRecoveryPolicy,
436 &config,
437 &plan,
438 &expected_digest,
439 )?;
440
441 assert_eq!(report.artifact_id, "recoverable-artifact");
442 assert_eq!(report.attempts, 1);
443 assert_eq!(report.outcome, RecoveryOutcome::Recovered);
444 assert_eq!(report.transfer_status, TransferStatus::Completed);
445 let Some(proof) = report.integrity else {
446 return Err(AsupersyncError::IntegrityMismatch {
447 artifact_id: "recoverable-artifact".to_string(),
448 expected: expected_digest,
449 observed: "<missing proof>".to_string(),
450 });
451 };
452 assert!(proof.verified);
453 assert_eq!(proof.algorithm, "fnv1a64");
454 assert_eq!(proof.expected_digest, proof.observed_digest);
455 Ok(())
456 }
457}