Skip to main content

fp_runtime/asupersync/
recovery.rs

1use std::time::{SystemTime, UNIX_EPOCH};
2
3use serde::{Deserialize, Serialize};
4
5use crate::asupersync::{
6    codec::ArtifactCodec,
7    config::AsupersyncConfig,
8    error::AsupersyncError,
9    integrity::{IntegrityProof, IntegrityVerifier},
10    transport::{TransferStatus, TransportLayer},
11};
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
14#[serde(rename_all = "snake_case")]
15pub enum RecoveryOutcome {
16    Recovered,
17    RetryScheduled,
18    Rejected,
19}
20
21#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
22pub struct RecoveryPlan {
23    pub artifact_id: String,
24    pub max_attempts: u32,
25    pub deadline_unix_ms: u64,
26}
27
28#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
29pub struct RecoveryReport {
30    pub artifact_id: String,
31    pub attempts: u32,
32    pub outcome: RecoveryOutcome,
33    pub transfer_status: TransferStatus,
34    pub integrity: Option<IntegrityProof>,
35}
36
37pub trait RecoveryPolicy {
38    fn should_retry(&self, attempt: u32, max_attempts: u32) -> bool;
39
40    fn classify(
41        &self,
42        transfer_status: TransferStatus,
43        attempt: u32,
44        max_attempts: u32,
45    ) -> RecoveryOutcome {
46        match transfer_status {
47            TransferStatus::Completed => RecoveryOutcome::Recovered,
48            TransferStatus::RetryableFailure => {
49                if self.should_retry(attempt, max_attempts) {
50                    RecoveryOutcome::RetryScheduled
51                } else {
52                    RecoveryOutcome::Rejected
53                }
54            }
55            TransferStatus::PermanentFailure => RecoveryOutcome::Rejected,
56        }
57    }
58}
59
60#[derive(Debug, Clone, Copy, Default)]
61pub struct ConservativeRecoveryPolicy;
62
63impl RecoveryPolicy for ConservativeRecoveryPolicy {
64    fn should_retry(&self, attempt: u32, max_attempts: u32) -> bool {
65        attempt < max_attempts
66    }
67}
68
69fn current_unix_ms() -> u64 {
70    let millis = SystemTime::now()
71        .duration_since(UNIX_EPOCH)
72        .map_or(0, |duration| duration.as_millis());
73    u64::try_from(millis).unwrap_or(u64::MAX)
74}
75
76fn recovery_deadline_expired(deadline_unix_ms: u64) -> bool {
77    deadline_unix_ms != 0 && current_unix_ms() > deadline_unix_ms
78}
79
80pub fn recover_once<C, T, V, P>(
81    codec: &C,
82    transport: &T,
83    verifier: &V,
84    policy: &P,
85    config: &AsupersyncConfig,
86    plan: &RecoveryPlan,
87    expected_digest: &str,
88) -> Result<RecoveryReport, AsupersyncError>
89where
90    C: ArtifactCodec,
91    T: TransportLayer,
92    V: IntegrityVerifier,
93    P: RecoveryPolicy,
94{
95    if plan.max_attempts == 0 {
96        return Err(AsupersyncError::Configuration(
97            "max_attempts must be greater than zero",
98        ));
99    }
100
101    // Per br-frankenpandas-bc6fa4: enforce a hard ceiling at max_attempts even
102    // if a buggy RecoveryPolicy returns should_retry=true past the limit.
103    // saturating_add prevents u32 overflow panics in debug; the explicit
104    // attempts >= max_attempts gate makes the contract independent of the
105    // policy implementation.
106    let mut attempts = 0_u32;
107    let should_retry = |attempt: u32| {
108        attempt < plan.max_attempts && policy.should_retry(attempt, plan.max_attempts)
109    };
110    loop {
111        if recovery_deadline_expired(plan.deadline_unix_ms) {
112            return Err(AsupersyncError::RecoveryExhausted {
113                artifact_id: plan.artifact_id.clone(),
114                attempts,
115            });
116        }
117
118        attempts = attempts.saturating_add(1);
119
120        let encoded = match transport.receive(&plan.artifact_id, config) {
121            Ok(encoded) => encoded,
122            Err(_) => {
123                if should_retry(attempts) {
124                    continue;
125                }
126                return Err(AsupersyncError::RecoveryExhausted {
127                    artifact_id: plan.artifact_id.clone(),
128                    attempts,
129                });
130            }
131        };
132
133        let payload = match codec.decode(&encoded, config) {
134            Ok(p) => p,
135            Err(_) => {
136                if should_retry(attempts) {
137                    continue;
138                }
139                return Err(AsupersyncError::RecoveryExhausted {
140                    artifact_id: plan.artifact_id.clone(),
141                    attempts,
142                });
143            }
144        };
145        match verifier.verify(&plan.artifact_id, &payload.bytes, expected_digest) {
146            Ok(integrity) => {
147                return Ok(RecoveryReport {
148                    artifact_id: plan.artifact_id.clone(),
149                    attempts,
150                    outcome: RecoveryOutcome::Recovered,
151                    transfer_status: TransferStatus::Completed,
152                    integrity: Some(integrity),
153                });
154            }
155            Err(_) => {
156                if should_retry(attempts) {
157                    continue;
158                }
159                return Err(AsupersyncError::RecoveryExhausted {
160                    artifact_id: plan.artifact_id.clone(),
161                    attempts,
162                });
163            }
164        }
165    }
166}
167
168#[cfg(test)]
169mod test_recover_once_bounded_loop_bc6fa4 {
170    use std::cell::Cell;
171
172    use super::*;
173    use crate::asupersync::{
174        codec::{ArtifactPayload, EncodedArtifact, PassthroughCodec},
175        config::{AsupersyncConfig, CapabilitySet, CxCapability},
176        integrity::{Fnv1aVerifier, IntegrityProof},
177        transport::InMemoryTransport,
178    };
179
180    /// A buggy policy that always says "retry" — without the hard ceiling fix
181    /// in br-bc6fa4, recover_once would loop forever and eventually overflow
182    /// attempts: u32 in release builds (or panic in debug).
183    struct AlwaysRetryPolicy;
184
185    impl RecoveryPolicy for AlwaysRetryPolicy {
186        fn should_retry(&self, _attempt: u32, _max_attempts: u32) -> bool {
187            true
188        }
189    }
190
191    /// A transport that always fails so the policy gets a chance to keep retrying.
192    struct AlwaysFailingTransport;
193
194    impl TransportLayer for AlwaysFailingTransport {
195        fn send(
196            &self,
197            _artifact: EncodedArtifact,
198            _config: &AsupersyncConfig,
199        ) -> Result<crate::asupersync::transport::TransferReport, AsupersyncError> {
200            Err(AsupersyncError::Transport(
201                "always-failing send".to_string(),
202            ))
203        }
204        fn receive(
205            &self,
206            _artifact_id: &str,
207            _config: &AsupersyncConfig,
208        ) -> Result<EncodedArtifact, AsupersyncError> {
209            Err(AsupersyncError::Transport(
210                "always-failing receive".to_string(),
211            ))
212        }
213        fn required_capabilities(&self) -> CapabilitySet {
214            CapabilitySet::for_capability(CxCapability::Io)
215        }
216    }
217
218    struct CountingFailingTransport {
219        receive_calls: Cell<u32>,
220    }
221
222    impl CountingFailingTransport {
223        fn new() -> Self {
224            Self {
225                receive_calls: Cell::new(0),
226            }
227        }
228    }
229
230    impl TransportLayer for CountingFailingTransport {
231        fn send(
232            &self,
233            _artifact: EncodedArtifact,
234            _config: &AsupersyncConfig,
235        ) -> Result<crate::asupersync::transport::TransferReport, AsupersyncError> {
236            Err(AsupersyncError::Transport(
237                "counting-failing send".to_string(),
238            ))
239        }
240        fn receive(
241            &self,
242            _artifact_id: &str,
243            _config: &AsupersyncConfig,
244        ) -> Result<EncodedArtifact, AsupersyncError> {
245            self.receive_calls
246                .set(self.receive_calls.get().saturating_add(1));
247            Err(AsupersyncError::Transport(
248                "counting-failing receive".to_string(),
249            ))
250        }
251        fn required_capabilities(&self) -> CapabilitySet {
252            CapabilitySet::for_capability(CxCapability::Io)
253        }
254    }
255
256    struct FailingVerifier;
257
258    impl IntegrityVerifier for FailingVerifier {
259        fn verify(
260            &self,
261            _artifact_id: &str,
262            _bytes: &[u8],
263            _expected_digest: &str,
264        ) -> Result<IntegrityProof, AsupersyncError> {
265            Err(AsupersyncError::IntegrityMismatch {
266                artifact_id: "failing-verifier".to_string(),
267                expected: "x".to_string(),
268                observed: "y".to_string(),
269            })
270        }
271    }
272
273    fn fnv1a_hex_for_test(bytes: &[u8]) -> String {
274        let mut hash = 0xcbf29ce484222325_u64;
275        for byte in bytes {
276            hash ^= u64::from(*byte);
277            hash = hash.wrapping_mul(0x100000001b3);
278        }
279        format!("{hash:016x}")
280    }
281
282    #[test]
283    fn recover_once_terminates_at_max_attempts_under_buggy_policy() {
284        // Without the hard ceiling fix, this test would never return.
285        let plan = RecoveryPlan {
286            artifact_id: "test-artifact".to_string(),
287            max_attempts: 5,
288            deadline_unix_ms: 0,
289        };
290        let config = AsupersyncConfig::default()
291            .with_capabilities(CapabilitySet::for_capability(CxCapability::Io));
292        let result = recover_once(
293            &PassthroughCodec,
294            &AlwaysFailingTransport,
295            &FailingVerifier,
296            &AlwaysRetryPolicy,
297            &config,
298            &plan,
299            "any-digest",
300        );
301        assert!(
302            matches!(
303                &result,
304                Err(AsupersyncError::RecoveryExhausted {
305                    artifact_id,
306                    attempts,
307                }) if artifact_id == "test-artifact" && *attempts == 5
308            ),
309            "expected RecoveryExhausted after exactly 5 attempts, got {result:?}"
310        );
311    }
312
313    #[test]
314    fn recover_once_zero_max_attempts_returns_configuration_error() {
315        let plan = RecoveryPlan {
316            artifact_id: "test".to_string(),
317            max_attempts: 0,
318            deadline_unix_ms: 0,
319        };
320        let config = AsupersyncConfig::default()
321            .with_capabilities(CapabilitySet::for_capability(CxCapability::Io));
322        let result = recover_once(
323            &PassthroughCodec,
324            &AlwaysFailingTransport,
325            &FailingVerifier,
326            &AlwaysRetryPolicy,
327            &config,
328            &plan,
329            "x",
330        );
331        assert!(matches!(result, Err(AsupersyncError::Configuration(_))));
332    }
333
334    #[test]
335    fn recover_once_expired_deadline_makes_zero_transport_attempts_ehn2c() {
336        let transport = CountingFailingTransport::new();
337        let plan = RecoveryPlan {
338            artifact_id: "expired-artifact".to_string(),
339            max_attempts: 3,
340            deadline_unix_ms: 1,
341        };
342        let config = AsupersyncConfig::default()
343            .with_capabilities(CapabilitySet::for_capability(CxCapability::Io));
344
345        let result = recover_once(
346            &PassthroughCodec,
347            &transport,
348            &FailingVerifier,
349            &AlwaysRetryPolicy,
350            &config,
351            &plan,
352            "any-digest",
353        );
354
355        assert!(
356            matches!(
357                &result,
358                Err(AsupersyncError::RecoveryExhausted {
359                    artifact_id,
360                    attempts,
361                }) if artifact_id == "expired-artifact" && *attempts == 0
362            ),
363            "expected deadline RecoveryExhausted, got {result:?}"
364        );
365        assert_eq!(
366            transport.receive_calls.get(),
367            0,
368            "expired recovery plans must fail before transport.receive"
369        );
370    }
371
372    #[test]
373    fn recover_once_future_deadline_preserves_retry_budget_ehn2c() {
374        let transport = CountingFailingTransport::new();
375        let plan = RecoveryPlan {
376            artifact_id: "future-artifact".to_string(),
377            max_attempts: 2,
378            deadline_unix_ms: current_unix_ms().saturating_add(60_000),
379        };
380        let config = AsupersyncConfig::default()
381            .with_capabilities(CapabilitySet::for_capability(CxCapability::Io));
382
383        let result = recover_once(
384            &PassthroughCodec,
385            &transport,
386            &FailingVerifier,
387            &AlwaysRetryPolicy,
388            &config,
389            &plan,
390            "any-digest",
391        );
392
393        assert!(
394            matches!(
395                &result,
396                Err(AsupersyncError::RecoveryExhausted {
397                    artifact_id,
398                    attempts,
399                }) if artifact_id == "future-artifact" && *attempts == 2
400            ),
401            "expected retry-budget RecoveryExhausted, got {result:?}"
402        );
403        assert_eq!(transport.receive_calls.get(), 2);
404    }
405
406    #[test]
407    fn recover_once_round_trips_real_codec_transport_and_verifier_2ryvf()
408    -> Result<(), AsupersyncError> {
409        let codec = PassthroughCodec;
410        let transport = InMemoryTransport::new();
411        let verifier = Fnv1aVerifier;
412        let config = AsupersyncConfig::default().with_capabilities(
413            CapabilitySet::for_capability(CxCapability::Io)
414                .union(CapabilitySet::for_capability(CxCapability::Remote)),
415        );
416        let bytes = b"recoverable artifact payload".to_vec();
417        let expected_digest = fnv1a_hex_for_test(&bytes);
418        let payload = ArtifactPayload {
419            artifact_id: "recoverable-artifact".to_string(),
420            bytes,
421            expected_digest: Some(expected_digest.clone()),
422        };
423        let encoded = codec.encode(&payload, &config)?;
424        transport.send(encoded, &config)?;
425
426        let plan = RecoveryPlan {
427            artifact_id: payload.artifact_id.clone(),
428            max_attempts: 3,
429            deadline_unix_ms: 0,
430        };
431        let report = recover_once(
432            &codec,
433            &transport,
434            &verifier,
435            &ConservativeRecoveryPolicy,
436            &config,
437            &plan,
438            &expected_digest,
439        )?;
440
441        assert_eq!(report.artifact_id, "recoverable-artifact");
442        assert_eq!(report.attempts, 1);
443        assert_eq!(report.outcome, RecoveryOutcome::Recovered);
444        assert_eq!(report.transfer_status, TransferStatus::Completed);
445        let Some(proof) = report.integrity else {
446            return Err(AsupersyncError::IntegrityMismatch {
447                artifact_id: "recoverable-artifact".to_string(),
448                expected: expected_digest,
449                observed: "<missing proof>".to_string(),
450            });
451        };
452        assert!(proof.verified);
453        assert_eq!(proof.algorithm, "fnv1a64");
454        assert_eq!(proof.expected_digest, proof.observed_digest);
455        Ok(())
456    }
457}