Skip to main content

fsqlite_core/
remote_effects.rs

1//! Remote effects contract primitives (§4.19.1-§4.19.5, `bd-numl`).
2//!
3//! This module provides:
4//! - explicit RemoteCap gating for remote execution paths,
5//! - named computations (no closure shipping),
6//! - deterministic idempotency key derivation + dedup store,
7//! - lease-backed liveness checks with deterministic escalation,
8//! - a cancellation-safe remote eviction saga skeleton.
9
10use std::collections::{HashMap, HashSet};
11use std::sync::Mutex;
12#[cfg(not(feature = "native"))]
13use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
14use std::thread;
15use std::time::Duration;
16#[cfg(not(feature = "native"))]
17use std::time::Instant;
18#[cfg(feature = "native")]
19use std::time::{SystemTime, UNIX_EPOCH};
20
21#[cfg(feature = "native")]
22use asupersync::combinator::bulkhead::{
23    Bulkhead as AdmissionBulkhead, BulkheadError as AdmissionBulkheadError,
24    BulkheadMetrics as AdmissionBulkheadMetrics, BulkheadPermit as AdmissionPermit, BulkheadPolicy,
25};
26#[cfg(feature = "native")]
27use asupersync::types::Time as AdmissionTime;
28use blake3::Hasher;
29use fsqlite_error::{FrankenError, Result};
30use fsqlite_types::cx::{Cx, cap};
31use fsqlite_types::{IdempotencyKey, ObjectId, RemoteCap, Saga};
32use tracing::{debug, info, warn};
33
34#[cfg(not(feature = "native"))]
35use crate::Bulkhead as AdmissionBulkhead;
36#[cfg(feature = "native")]
37use crate::available_parallelism_or_one;
38#[cfg(not(feature = "native"))]
39use crate::{
40    BulkheadConfig, BulkheadPermit as AdmissionPermit, OverflowPolicy, available_parallelism_or_one,
41};
42
43const BEAD_ID: &str = "bd-numl";
44const MAX_BALANCED_REMOTE_IN_FLIGHT: usize = 8;
45const REMOTE_EFFECTS_EXECUTOR_NAME: &str = "fsqlite.remote_effects";
46#[allow(dead_code)]
47pub(crate) const TIERED_STORAGE_EXECUTOR_NAME: &str = "fsqlite.tiered_storage";
48#[allow(dead_code)]
49const DEFAULT_TIERED_STORAGE_QUEUE_DEPTH: usize = 1;
50#[allow(dead_code)]
51const DEFAULT_TIERED_STORAGE_QUEUE_TIMEOUT: Duration = Duration::from_millis(250);
52const ADMISSION_POLL_INTERVAL: Duration = Duration::from_millis(1);
53
54/// Domain separator for deterministic remote idempotency keys.
55pub const REMOTE_IDEMPOTENCY_DOMAIN: &str = "fsqlite:remote:v1";
56
57/// Named remote computations (§4.19.2).
58#[derive(Debug, Clone, PartialEq, Eq, Hash)]
59pub enum ComputationName {
60    /// `symbol_get_range(object_id, esi_lo, esi_hi, ecs_epoch)`
61    SymbolGetRange,
62    /// `symbol_put_batch(object_id, symbols[], ecs_epoch)`
63    SymbolPutBatch,
64    /// `segment_put(segment_id, bytes, ecs_epoch)`
65    SegmentPut,
66    /// `segment_stat(segment_id, ecs_epoch)`
67    SegmentStat,
68    /// Explicit extension point; not accepted unless registered.
69    Custom(String),
70}
71
72impl ComputationName {
73    #[must_use]
74    pub fn as_str(&self) -> &str {
75        match self {
76            Self::SymbolGetRange => "symbol_get_range",
77            Self::SymbolPutBatch => "symbol_put_batch",
78            Self::SegmentPut => "segment_put",
79            Self::SegmentStat => "segment_stat",
80            Self::Custom(name) => name.as_str(),
81        }
82    }
83
84    #[must_use]
85    fn canonical_tag(&self) -> u8 {
86        match self {
87            Self::SymbolGetRange => 0x01,
88            Self::SymbolPutBatch => 0x02,
89            Self::SegmentPut => 0x03,
90            Self::SegmentStat => 0x04,
91            Self::Custom(_) => 0xFF,
92        }
93    }
94
95    #[must_use]
96    fn canonical_name_bytes(&self) -> Vec<u8> {
97        self.as_str().as_bytes().to_vec()
98    }
99}
100
101/// Serialized remote computation request payload.
102#[derive(Debug, Clone, PartialEq, Eq)]
103pub struct NamedComputation {
104    pub name: ComputationName,
105    pub input_bytes: Vec<u8>,
106}
107
108impl NamedComputation {
109    #[must_use]
110    pub const fn new(name: ComputationName, input_bytes: Vec<u8>) -> Self {
111        Self { name, input_bytes }
112    }
113
114    /// Build canonical bytes used for idempotency and auditing.
115    ///
116    /// Layout:
117    /// `[domain_len:u32][domain][tag:u8][name_len:u32][name][input_len:u32][input]`
118    ///
119    /// # Errors
120    ///
121    /// Returns `FrankenError::OutOfRange` if name/input lengths exceed `u32`.
122    pub fn canonical_request_bytes(&self) -> Result<Vec<u8>> {
123        let domain = REMOTE_IDEMPOTENCY_DOMAIN.as_bytes();
124        let name_bytes = self.name.canonical_name_bytes();
125
126        let domain_len = u32::try_from(domain.len()).map_err(|_| FrankenError::OutOfRange {
127            what: "remote_domain_len".to_owned(),
128            value: domain.len().to_string(),
129        })?;
130        let name_len = u32::try_from(name_bytes.len()).map_err(|_| FrankenError::OutOfRange {
131            what: "computation_name_len".to_owned(),
132            value: name_bytes.len().to_string(),
133        })?;
134        let input_len =
135            u32::try_from(self.input_bytes.len()).map_err(|_| FrankenError::OutOfRange {
136                what: "computation_input_len".to_owned(),
137                value: self.input_bytes.len().to_string(),
138            })?;
139
140        let mut out = Vec::with_capacity(
141            4 + domain.len() + 1 + 4 + name_bytes.len() + 4 + self.input_bytes.len(),
142        );
143        out.extend_from_slice(&domain_len.to_le_bytes());
144        out.extend_from_slice(domain);
145        out.push(self.name.canonical_tag());
146        out.extend_from_slice(&name_len.to_le_bytes());
147        out.extend_from_slice(&name_bytes);
148        out.extend_from_slice(&input_len.to_le_bytes());
149        out.extend_from_slice(&self.input_bytes);
150        Ok(out)
151    }
152}
153
154/// Registry of allowed named remote computations.
155#[derive(Debug, Clone)]
156pub struct ComputationRegistry {
157    allowed: HashSet<ComputationName>,
158}
159
160impl ComputationRegistry {
161    #[must_use]
162    pub fn new_empty() -> Self {
163        Self {
164            allowed: HashSet::new(),
165        }
166    }
167
168    #[must_use]
169    pub fn with_normative_names() -> Self {
170        let mut registry = Self::new_empty();
171        registry.register(ComputationName::SymbolGetRange);
172        registry.register(ComputationName::SymbolPutBatch);
173        registry.register(ComputationName::SegmentPut);
174        registry.register(ComputationName::SegmentStat);
175        registry
176    }
177
178    pub fn register(&mut self, name: ComputationName) {
179        self.allowed.insert(name);
180    }
181
182    #[must_use]
183    pub fn is_registered(&self, name: &ComputationName) -> bool {
184        self.allowed.contains(name)
185    }
186
187    /// Validate computation is registered for dispatch.
188    ///
189    /// # Errors
190    ///
191    /// Returns `FrankenError::Unsupported` if the computation is not registered.
192    pub fn validate(&self, name: &ComputationName) -> Result<()> {
193        if self.is_registered(name) {
194            Ok(())
195        } else {
196            Err(FrankenError::Unsupported)
197        }
198    }
199}
200
201impl Default for ComputationRegistry {
202    fn default() -> Self {
203        Self::with_normative_names()
204    }
205}
206
207/// Structured remote-effect log context.
208#[derive(Debug, Clone, Default, PartialEq, Eq)]
209pub struct TraceContext {
210    pub trace_id: String,
211    pub saga_id: Option<Saga>,
212    pub idempotency_key: Option<IdempotencyKey>,
213    pub attempt: u32,
214    pub ecs_epoch: u64,
215    pub lab_seed: Option<u64>,
216    pub schedule_fingerprint: Option<String>,
217}
218
219/// Derive deterministic idempotency key:
220/// `Trunc128(BLAKE3("fsqlite:remote:v1" || request_bytes))`.
221#[must_use]
222pub fn derive_idempotency_key(request_bytes: &[u8]) -> IdempotencyKey {
223    let mut hasher = Hasher::new();
224    hasher.update(REMOTE_IDEMPOTENCY_DOMAIN.as_bytes());
225    hasher.update(request_bytes);
226    let digest = hasher.finalize();
227    let mut out = [0_u8; 16];
228    out.copy_from_slice(&digest.as_bytes()[..16]);
229    IdempotencyKey::from_bytes(out)
230}
231
232#[must_use]
233fn request_digest(request_bytes: &[u8]) -> [u8; 32] {
234    let mut hasher = Hasher::new();
235    hasher.update(request_bytes);
236    *hasher.finalize().as_bytes()
237}
238
239/// Deduplication outcome for an idempotent remote request.
240#[derive(Debug, Clone, PartialEq, Eq)]
241pub enum IdempotencyDecision {
242    StoredNew(Vec<u8>),
243    Replayed(Vec<u8>),
244}
245
246#[derive(Debug, Clone)]
247struct IdempotencyEntry {
248    computation: ComputationName,
249    request_digest: [u8; 32],
250    outcome: Vec<u8>,
251}
252
253/// In-memory idempotency store for remote effects.
254#[derive(Debug, Default)]
255pub struct IdempotencyStore {
256    entries: Mutex<HashMap<IdempotencyKey, IdempotencyEntry>>,
257}
258
259impl IdempotencyStore {
260    #[must_use]
261    pub fn new() -> Self {
262        Self::default()
263    }
264
265    /// Register outcome for `(key, computation, request)` or replay prior value.
266    ///
267    /// # Errors
268    ///
269    /// Returns `FrankenError::Internal` if the same idempotency key is reused
270    /// with different request bytes or a different computation name.
271    pub fn register_or_replay(
272        &self,
273        key: IdempotencyKey,
274        computation: &ComputationName,
275        request_bytes: &[u8],
276        outcome: &[u8],
277    ) -> Result<IdempotencyDecision> {
278        let digest = request_digest(request_bytes);
279        let mut guard = self
280            .entries
281            .lock()
282            .unwrap_or_else(std::sync::PoisonError::into_inner);
283
284        if let Some(existing) = guard.get(&key) {
285            if existing.request_digest == digest && existing.computation == *computation {
286                return Ok(IdempotencyDecision::Replayed(existing.outcome.clone()));
287            }
288            return Err(FrankenError::Internal(
289                "idempotency conflict: same key used for different remote request".to_owned(),
290            ));
291        }
292
293        guard.insert(
294            key,
295            IdempotencyEntry {
296                computation: computation.clone(),
297                request_digest: digest,
298                outcome: outcome.to_vec(),
299            },
300        );
301        drop(guard);
302        Ok(IdempotencyDecision::StoredNew(outcome.to_vec()))
303    }
304}
305
306/// Require a runtime RemoteCap in addition to type-level `HasRemote`.
307///
308/// # Errors
309///
310/// Returns `FrankenError::Internal` if `remote_cap` is `None`.
311pub fn require_remote_cap<Caps>(_: &Cx<Caps>, remote_cap: Option<RemoteCap>) -> Result<RemoteCap>
312where
313    Caps: cap::SubsetOf<cap::All> + cap::HasRemote,
314{
315    remote_cap.ok_or_else(|| {
316        FrankenError::Internal("remote capability token missing for remote effect".to_owned())
317    })
318}
319
320/// Conservative default for `fsqlite.remote_max_in_flight` (balanced profile).
321///
322/// Formula: `clamp(P / 8, 1, 8)` where `P = available_parallelism`.
323#[must_use]
324pub const fn conservative_remote_max_in_flight(parallelism: usize) -> usize {
325    let base = parallelism / 8;
326    if base == 0 {
327        1
328    } else if base > MAX_BALANCED_REMOTE_IN_FLIGHT {
329        MAX_BALANCED_REMOTE_IN_FLIGHT
330    } else {
331        base
332    }
333}
334
335/// Snapshotted admission state for remote work.
336#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
337pub struct AdmissionSnapshot {
338    pub active_permits: usize,
339    pub queue_depth: usize,
340    pub total_rejected: u64,
341    pub total_cancelled: u64,
342}
343
344#[cfg(feature = "native")]
345#[must_use]
346fn admission_now() -> AdmissionTime {
347    let millis = SystemTime::now()
348        .duration_since(UNIX_EPOCH)
349        .unwrap_or_default()
350        .as_millis();
351    let millis = u64::try_from(millis).unwrap_or(u64::MAX);
352    AdmissionTime::from_millis(millis)
353}
354
355/// Executor for remote operations guarded by a global bulkhead.
356#[derive(Debug)]
357pub struct Executor {
358    name: &'static str,
359    max_in_flight: usize,
360    max_queue: usize,
361    queue_timeout: Duration,
362    bulkhead: AdmissionBulkhead,
363    #[cfg(not(feature = "native"))]
364    queued_waiters: AtomicUsize,
365    #[cfg(not(feature = "native"))]
366    total_rejected: AtomicU64,
367    #[cfg(not(feature = "native"))]
368    total_cancelled: AtomicU64,
369}
370
371impl Executor {
372    /// Build executor from `PRAGMA fsqlite.remote_max_in_flight`.
373    ///
374    /// `0` means "auto" and resolves to the conservative balanced default.
375    ///
376    /// # Errors
377    ///
378    /// Returns `FrankenError::OutOfRange` when `remote_max_in_flight` is
379    /// non-zero but invalid.
380    pub fn from_pragma_remote_max_in_flight(remote_max_in_flight: usize) -> Result<Self> {
381        if remote_max_in_flight == 0 {
382            Ok(Self::balanced_default())
383        } else {
384            Self::with_max_in_flight(remote_max_in_flight)
385        }
386    }
387
388    /// Create with explicit in-flight limit.
389    ///
390    /// # Errors
391    ///
392    /// Returns `FrankenError::OutOfRange` if `max_in_flight == 0`.
393    pub fn with_max_in_flight(max_in_flight: usize) -> Result<Self> {
394        Self::with_limits(
395            REMOTE_EFFECTS_EXECUTOR_NAME,
396            max_in_flight,
397            0,
398            Duration::ZERO,
399        )
400    }
401
402    #[must_use]
403    pub fn balanced_default() -> Self {
404        let p = available_parallelism_or_one();
405        let max_in_flight = conservative_remote_max_in_flight(p);
406        Self::with_max_in_flight(max_in_flight)
407            .expect("remote balanced max_in_flight is always >= 1")
408    }
409
410    /// Low-risk first production rollout slice for `bd-28z4i.6`: keep the
411    /// existing conservative concurrency cap, but allow one bounded waiter so
412    /// `&Cx` cancellation can unwind tiered-storage remote admission cleanly.
413    #[must_use]
414    #[allow(dead_code)]
415    pub(crate) fn balanced_tiered_storage_default() -> Self {
416        let p = available_parallelism_or_one();
417        let max_in_flight = conservative_remote_max_in_flight(p);
418        Self::with_limits(
419            TIERED_STORAGE_EXECUTOR_NAME,
420            max_in_flight,
421            DEFAULT_TIERED_STORAGE_QUEUE_DEPTH,
422            DEFAULT_TIERED_STORAGE_QUEUE_TIMEOUT,
423        )
424        .expect("tiered storage balanced max_in_flight is always >= 1")
425    }
426
427    /// Create a named executor with explicit queue bounds.
428    ///
429    /// # Errors
430    ///
431    /// Returns `FrankenError::OutOfRange` when any configured limit does not
432    /// fit the underlying admission controller.
433    pub(crate) fn with_limits(
434        name: &'static str,
435        max_in_flight: usize,
436        max_queue: usize,
437        queue_timeout: Duration,
438    ) -> Result<Self> {
439        let max_in_flight_u32 =
440            u32::try_from(max_in_flight).map_err(|_| FrankenError::OutOfRange {
441                what: format!("{name}.max_in_flight"),
442                value: max_in_flight.to_string(),
443            })?;
444        if max_in_flight_u32 == 0 {
445            return Err(FrankenError::OutOfRange {
446                what: format!("{name}.max_in_flight"),
447                value: max_in_flight.to_string(),
448            });
449        }
450        #[cfg(feature = "native")]
451        let bulkhead = {
452            let max_queue_u32 = u32::try_from(max_queue).map_err(|_| FrankenError::OutOfRange {
453                what: format!("{name}.max_queue"),
454                value: max_queue.to_string(),
455            })?;
456            AdmissionBulkhead::new(BulkheadPolicy {
457                name: name.to_owned(),
458                max_concurrent: max_in_flight_u32,
459                max_queue: max_queue_u32,
460                queue_timeout,
461                weighted: false,
462                on_full: None,
463            })
464        };
465        #[cfg(not(feature = "native"))]
466        let bulkhead = {
467            let config = BulkheadConfig::new(max_in_flight, 0, OverflowPolicy::DropBusy)
468                .ok_or_else(|| FrankenError::OutOfRange {
469                    what: format!("{name}.max_in_flight"),
470                    value: max_in_flight.to_string(),
471                })?;
472            AdmissionBulkhead::new(config)
473        };
474
475        Ok(Self {
476            name,
477            max_in_flight,
478            max_queue,
479            queue_timeout,
480            bulkhead,
481            #[cfg(not(feature = "native"))]
482            queued_waiters: AtomicUsize::new(0),
483            #[cfg(not(feature = "native"))]
484            total_rejected: AtomicU64::new(0),
485            #[cfg(not(feature = "native"))]
486            total_cancelled: AtomicU64::new(0),
487        })
488    }
489
490    #[must_use]
491    pub const fn name(&self) -> &'static str {
492        self.name
493    }
494
495    #[must_use]
496    pub const fn max_in_flight(&self) -> usize {
497        self.max_in_flight
498    }
499
500    #[must_use]
501    pub const fn max_queue(&self) -> usize {
502        self.max_queue
503    }
504
505    #[must_use]
506    pub fn snapshot(&self) -> AdmissionSnapshot {
507        #[cfg(feature = "native")]
508        {
509            let metrics: AdmissionBulkheadMetrics = self.bulkhead.metrics();
510            AdmissionSnapshot {
511                #[allow(clippy::cast_possible_truncation)]
512                active_permits: metrics.active_permits as usize,
513                #[allow(clippy::cast_possible_truncation)]
514                queue_depth: metrics.queue_depth as usize,
515                total_rejected: metrics.total_rejected,
516                total_cancelled: metrics.total_cancelled,
517            }
518        }
519        #[cfg(not(feature = "native"))]
520        {
521            AdmissionSnapshot {
522                active_permits: self.bulkhead.in_flight(),
523                queue_depth: self.queued_waiters.load(Ordering::Acquire),
524                total_rejected: self.total_rejected.load(Ordering::Acquire),
525                total_cancelled: self.total_cancelled.load(Ordering::Acquire),
526            }
527        }
528    }
529
530    #[cfg(not(feature = "native"))]
531    fn reserve_fallback_waiter(&self) -> bool {
532        loop {
533            let current = self.queued_waiters.load(Ordering::Acquire);
534            if current >= self.max_queue {
535                return false;
536            }
537            let next = current + 1;
538            if self
539                .queued_waiters
540                .compare_exchange_weak(current, next, Ordering::AcqRel, Ordering::Acquire)
541                .is_ok()
542            {
543                return true;
544            }
545        }
546    }
547
548    #[cfg(test)]
549    #[allow(dead_code)]
550    pub(crate) fn try_acquire_for_testing(&self) -> Option<AdmissionPermit<'_>> {
551        #[cfg(feature = "native")]
552        {
553            self.bulkhead.try_acquire(1)
554        }
555        #[cfg(not(feature = "native"))]
556        {
557            self.bulkhead.try_acquire().ok()
558        }
559    }
560
561    pub(crate) fn run<Caps, T, F>(
562        &self,
563        cx: &Cx<Caps>,
564        effect_name: &str,
565        saga: Option<Saga>,
566        idempotency_key: Option<IdempotencyKey>,
567        ecs_epoch: u64,
568        operation: F,
569    ) -> Result<T>
570    where
571        Caps: cap::SubsetOf<cap::All>,
572        F: FnOnce() -> Result<T>,
573    {
574        let _permit = self.acquire(cx, effect_name, saga, idempotency_key, ecs_epoch)?;
575        cx.checkpoint().map_err(|_| {
576            let snapshot = self.snapshot();
577            warn!(
578                bead_id = BEAD_ID,
579                executor = self.name,
580                effect_name,
581                saga_id = format_saga(saga),
582                idempotency_key = format_key(idempotency_key),
583                ecs_epoch,
584                admission = "cancelled_post_acquire",
585                active_permits = snapshot.active_permits,
586                queue_depth = snapshot.queue_depth,
587                total_cancelled = snapshot.total_cancelled,
588                "remote operation cancelled after admission and before dispatch"
589            );
590            FrankenError::Busy
591        })?;
592        match operation() {
593            Ok(out) => {
594                let snapshot = self.snapshot();
595                info!(
596                    bead_id = BEAD_ID,
597                    executor = self.name,
598                    effect_name,
599                    saga_id = format_saga(saga),
600                    idempotency_key = format_key(idempotency_key),
601                    ecs_epoch,
602                    active_permits = snapshot.active_permits,
603                    queue_depth = snapshot.queue_depth,
604                    "remote operation completed under admission control"
605                );
606                Ok(out)
607            }
608            Err(err) => {
609                let snapshot = self.snapshot();
610                warn!(
611                    bead_id = BEAD_ID,
612                    executor = self.name,
613                    effect_name,
614                    saga_id = format_saga(saga),
615                    idempotency_key = format_key(idempotency_key),
616                    ecs_epoch,
617                    active_permits = snapshot.active_permits,
618                    queue_depth = snapshot.queue_depth,
619                    error = %err,
620                    "remote operation failed under admission control"
621                );
622                Err(err)
623            }
624        }
625    }
626
627    fn acquire<Caps>(
628        &self,
629        cx: &Cx<Caps>,
630        effect_name: &str,
631        saga: Option<Saga>,
632        idempotency_key: Option<IdempotencyKey>,
633        ecs_epoch: u64,
634    ) -> Result<AdmissionPermit<'_>>
635    where
636        Caps: cap::SubsetOf<cap::All>,
637    {
638        cx.checkpoint().map_err(|_| FrankenError::Busy)?;
639
640        #[cfg(feature = "native")]
641        {
642            if let Some(permit) = self.bulkhead.try_acquire(1) {
643                let snapshot = self.snapshot();
644                debug!(
645                    bead_id = BEAD_ID,
646                    executor = self.name,
647                    effect_name,
648                    saga_id = format_saga(saga),
649                    idempotency_key = format_key(idempotency_key),
650                    ecs_epoch,
651                    admission = "immediate",
652                    active_permits = snapshot.active_permits,
653                    queue_depth = snapshot.queue_depth,
654                    max_in_flight = self.max_in_flight,
655                    max_queue = self.max_queue,
656                    "remote admission granted immediately"
657                );
658                return Ok(permit);
659            }
660
661            let snapshot = self.snapshot();
662            if self.max_queue == 0 {
663                warn!(
664                    bead_id = BEAD_ID,
665                    executor = self.name,
666                    effect_name,
667                    saga_id = format_saga(saga),
668                    idempotency_key = format_key(idempotency_key),
669                    ecs_epoch,
670                    admission = "rejected",
671                    active_permits = snapshot.active_permits,
672                    queue_depth = snapshot.queue_depth,
673                    total_rejected = snapshot.total_rejected,
674                    "remote admission saturated"
675                );
676                return Err(FrankenError::Busy);
677            }
678
679            let queued_at = admission_now();
680            let entry_id = match self.bulkhead.enqueue(1, queued_at) {
681                Ok(entry_id) => entry_id,
682                Err(AdmissionBulkheadError::Full | AdmissionBulkheadError::QueueFull) => {
683                    let snapshot = self.snapshot();
684                    warn!(
685                        bead_id = BEAD_ID,
686                        executor = self.name,
687                        effect_name,
688                        saga_id = format_saga(saga),
689                        idempotency_key = format_key(idempotency_key),
690                        ecs_epoch,
691                        admission = "rejected",
692                        active_permits = snapshot.active_permits,
693                        queue_depth = snapshot.queue_depth,
694                        total_rejected = snapshot.total_rejected,
695                        "remote admission queue full"
696                    );
697                    return Err(FrankenError::Busy);
698                }
699                Err(
700                    AdmissionBulkheadError::QueueTimeout { .. } | AdmissionBulkheadError::Cancelled,
701                ) => {
702                    return Err(FrankenError::Busy);
703                }
704                Err(AdmissionBulkheadError::Inner(())) => unreachable!(),
705            };
706
707            let snapshot = self.snapshot();
708            debug!(
709                bead_id = BEAD_ID,
710                executor = self.name,
711                effect_name,
712                saga_id = format_saga(saga),
713                idempotency_key = format_key(idempotency_key),
714                ecs_epoch,
715                admission = "queued",
716                entry_id,
717                queue_depth = snapshot.queue_depth,
718                queue_timeout_ms = self.queue_timeout.as_millis(),
719                "remote admission queued"
720            );
721
722            loop {
723                if cx.checkpoint().is_err() {
724                    self.bulkhead.cancel_entry(entry_id, AdmissionTime::ZERO);
725                    let snapshot = self.snapshot();
726                    warn!(
727                        bead_id = BEAD_ID,
728                        executor = self.name,
729                        effect_name,
730                        saga_id = format_saga(saga),
731                        idempotency_key = format_key(idempotency_key),
732                        ecs_epoch,
733                        admission = "cancelled",
734                        entry_id,
735                        queue_depth = snapshot.queue_depth,
736                        total_cancelled = snapshot.total_cancelled,
737                        "remote admission cancelled while waiting"
738                    );
739                    return Err(FrankenError::Busy);
740                }
741
742                let now = admission_now();
743                match self.bulkhead.check_entry(entry_id, now) {
744                    Ok(Some(permit)) => {
745                        let waited_ms = now.as_millis().saturating_sub(queued_at.as_millis());
746                        let snapshot = self.snapshot();
747                        debug!(
748                            bead_id = BEAD_ID,
749                            executor = self.name,
750                            effect_name,
751                            saga_id = format_saga(saga),
752                            idempotency_key = format_key(idempotency_key),
753                            ecs_epoch,
754                            admission = "dequeued",
755                            entry_id,
756                            waited_ms,
757                            active_permits = snapshot.active_permits,
758                            queue_depth = snapshot.queue_depth,
759                            "remote admission granted from queue"
760                        );
761                        return Ok(permit);
762                    }
763                    Ok(None) => thread::sleep(ADMISSION_POLL_INTERVAL),
764                    Err(AdmissionBulkheadError::QueueTimeout { waited }) => {
765                        let snapshot = self.snapshot();
766                        warn!(
767                            bead_id = BEAD_ID,
768                            executor = self.name,
769                            effect_name,
770                            saga_id = format_saga(saga),
771                            idempotency_key = format_key(idempotency_key),
772                            ecs_epoch,
773                            admission = "timed_out",
774                            entry_id,
775                            waited_ms = waited.as_millis(),
776                            queue_depth = snapshot.queue_depth,
777                            total_rejected = snapshot.total_rejected,
778                            "remote admission queue timeout"
779                        );
780                        return Err(FrankenError::Busy);
781                    }
782                    Err(
783                        AdmissionBulkheadError::Cancelled
784                        | AdmissionBulkheadError::Full
785                        | AdmissionBulkheadError::QueueFull,
786                    ) => {
787                        return Err(FrankenError::Busy);
788                    }
789                    Err(AdmissionBulkheadError::Inner(())) => unreachable!(),
790                }
791            }
792        }
793
794        #[cfg(not(feature = "native"))]
795        {
796            if let Ok(permit) = self.bulkhead.try_acquire() {
797                let snapshot = self.snapshot();
798                debug!(
799                    bead_id = BEAD_ID,
800                    executor = self.name,
801                    effect_name,
802                    saga_id = format_saga(saga),
803                    idempotency_key = format_key(idempotency_key),
804                    ecs_epoch,
805                    admission = "immediate",
806                    active_permits = snapshot.active_permits,
807                    queue_depth = snapshot.queue_depth,
808                    max_in_flight = self.max_in_flight,
809                    max_queue = self.max_queue,
810                    "remote admission granted via local fallback bulkhead"
811                );
812                return Ok(permit);
813            }
814
815            if self.max_queue == 0 {
816                self.total_rejected.fetch_add(1, Ordering::AcqRel);
817                let snapshot = self.snapshot();
818                warn!(
819                    bead_id = BEAD_ID,
820                    executor = self.name,
821                    effect_name,
822                    saga_id = format_saga(saga),
823                    idempotency_key = format_key(idempotency_key),
824                    ecs_epoch,
825                    admission = "rejected",
826                    active_permits = snapshot.active_permits,
827                    queue_depth = snapshot.queue_depth,
828                    total_rejected = snapshot.total_rejected,
829                    "remote admission saturated"
830                );
831                return Err(FrankenError::Busy);
832            }
833
834            if !self.reserve_fallback_waiter() {
835                self.total_rejected.fetch_add(1, Ordering::AcqRel);
836                let snapshot = self.snapshot();
837                warn!(
838                    bead_id = BEAD_ID,
839                    executor = self.name,
840                    effect_name,
841                    saga_id = format_saga(saga),
842                    idempotency_key = format_key(idempotency_key),
843                    ecs_epoch,
844                    admission = "rejected",
845                    active_permits = snapshot.active_permits,
846                    queue_depth = snapshot.queue_depth,
847                    total_rejected = snapshot.total_rejected,
848                    "remote admission queue full"
849                );
850                return Err(FrankenError::Busy);
851            }
852
853            let queued_at = Instant::now();
854            let queued = FallbackQueueReservation::new(&self.queued_waiters);
855            let snapshot = self.snapshot();
856            debug!(
857                bead_id = BEAD_ID,
858                executor = self.name,
859                effect_name,
860                saga_id = format_saga(saga),
861                idempotency_key = format_key(idempotency_key),
862                ecs_epoch,
863                admission = "queued",
864                queue_depth = snapshot.queue_depth,
865                queue_timeout_ms = self.queue_timeout.as_millis(),
866                "remote admission queued via local fallback bulkhead"
867            );
868
869            loop {
870                if cx.checkpoint().is_err() {
871                    self.total_cancelled.fetch_add(1, Ordering::AcqRel);
872                    queued.release();
873                    let snapshot = self.snapshot();
874                    warn!(
875                        bead_id = BEAD_ID,
876                        executor = self.name,
877                        effect_name,
878                        saga_id = format_saga(saga),
879                        idempotency_key = format_key(idempotency_key),
880                        ecs_epoch,
881                        admission = "cancelled",
882                        queue_depth = snapshot.queue_depth,
883                        total_cancelled = snapshot.total_cancelled,
884                        "remote admission cancelled while waiting"
885                    );
886                    return Err(FrankenError::Busy);
887                }
888
889                match self.bulkhead.try_acquire() {
890                    Ok(permit) => {
891                        let waited_ms =
892                            u64::try_from(queued_at.elapsed().as_millis()).unwrap_or(u64::MAX);
893                        queued.release();
894                        let snapshot = self.snapshot();
895                        debug!(
896                            bead_id = BEAD_ID,
897                            executor = self.name,
898                            effect_name,
899                            saga_id = format_saga(saga),
900                            idempotency_key = format_key(idempotency_key),
901                            ecs_epoch,
902                            admission = "dequeued",
903                            waited_ms,
904                            active_permits = snapshot.active_permits,
905                            queue_depth = snapshot.queue_depth,
906                            "remote admission granted from local fallback queue"
907                        );
908                        return Ok(permit);
909                    }
910                    Err(FrankenError::Busy) => {
911                        if queued_at.elapsed() >= self.queue_timeout {
912                            self.total_rejected.fetch_add(1, Ordering::AcqRel);
913                            let waited_ms =
914                                u64::try_from(queued_at.elapsed().as_millis()).unwrap_or(u64::MAX);
915                            queued.release();
916                            let snapshot = self.snapshot();
917                            warn!(
918                                bead_id = BEAD_ID,
919                                executor = self.name,
920                                effect_name,
921                                saga_id = format_saga(saga),
922                                idempotency_key = format_key(idempotency_key),
923                                ecs_epoch,
924                                admission = "timed_out",
925                                waited_ms,
926                                queue_depth = snapshot.queue_depth,
927                                total_rejected = snapshot.total_rejected,
928                                "remote admission queue timeout"
929                            );
930                            return Err(FrankenError::Busy);
931                        }
932                        thread::sleep(ADMISSION_POLL_INTERVAL);
933                    }
934                    Err(err) => return Err(err),
935                }
936            }
937        }
938    }
939
940    /// Execute a named remote computation through the global remote bulkhead.
941    ///
942    /// # Errors
943    ///
944    /// Returns:
945    /// - `FrankenError::Internal` when `remote_cap` is absent,
946    /// - `FrankenError::Unsupported` for unregistered computations,
947    /// - `FrankenError::Busy` when the remote bulkhead is saturated,
948    /// - or any error from `operation`.
949    pub fn execute<Caps, F>(
950        &self,
951        cx: &Cx<Caps>,
952        remote_cap: Option<RemoteCap>,
953        registry: &ComputationRegistry,
954        computation: &NamedComputation,
955        trace: &TraceContext,
956        operation: F,
957    ) -> Result<Vec<u8>>
958    where
959        Caps: cap::SubsetOf<cap::All> + cap::HasRemote,
960        F: FnOnce() -> Result<Vec<u8>>,
961    {
962        let _cap = require_remote_cap(cx, remote_cap)?;
963        registry.validate(&computation.name)?;
964        debug!(
965            bead_id = BEAD_ID,
966            trace_id = trace.trace_id,
967            effect_name = computation.name.as_str(),
968            saga_id = format_saga(trace.saga_id),
969            idempotency_key = format_key(trace.idempotency_key),
970            attempt = trace.attempt,
971            ecs_epoch = trace.ecs_epoch,
972            lab_seed = ?trace.lab_seed,
973            schedule_fingerprint = ?trace.schedule_fingerprint,
974            "dispatching named remote computation"
975        );
976        let out = self.run(
977            cx,
978            computation.name.as_str(),
979            trace.saga_id,
980            trace.idempotency_key,
981            trace.ecs_epoch,
982            operation,
983        )?;
984
985        info!(
986            bead_id = BEAD_ID,
987            trace_id = trace.trace_id,
988            effect_name = computation.name.as_str(),
989            saga_id = format_saga(trace.saga_id),
990            idempotency_key = format_key(trace.idempotency_key),
991            attempt = trace.attempt,
992            ecs_epoch = trace.ecs_epoch,
993            "remote computation completed"
994        );
995
996        Ok(out)
997    }
998}
999
1000/// Lease-expiry escalation policy.
1001#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1002pub enum LeaseEscalation {
1003    Cancel,
1004    Retry,
1005    Fail,
1006}
1007
1008/// Lease liveness result.
1009#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1010pub enum LeaseStatus {
1011    Live,
1012    Expired { escalation: LeaseEscalation },
1013}
1014
1015/// Lease-backed remote handle metadata.
1016#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1017pub struct LeaseBackedHandle {
1018    pub lease_id: u64,
1019    pub issued_at_millis: u64,
1020    pub ttl_millis: u64,
1021    pub escalation: LeaseEscalation,
1022}
1023
1024impl LeaseBackedHandle {
1025    /// Create a lease-backed handle.
1026    ///
1027    /// # Errors
1028    ///
1029    /// Returns `FrankenError::OutOfRange` if `ttl_millis == 0`.
1030    pub fn new(
1031        lease_id: u64,
1032        issued_at_millis: u64,
1033        ttl_millis: u64,
1034        escalation: LeaseEscalation,
1035    ) -> Result<Self> {
1036        if ttl_millis == 0 {
1037            return Err(FrankenError::OutOfRange {
1038                what: "lease_ttl_millis".to_owned(),
1039                value: "0".to_owned(),
1040            });
1041        }
1042        Ok(Self {
1043            lease_id,
1044            issued_at_millis,
1045            ttl_millis,
1046            escalation,
1047        })
1048    }
1049
1050    #[must_use]
1051    pub fn evaluate(&self, now_millis: u64) -> LeaseStatus {
1052        let age_millis = now_millis.saturating_sub(self.issued_at_millis);
1053        if age_millis >= self.ttl_millis {
1054            LeaseStatus::Expired {
1055                escalation: self.escalation,
1056            }
1057        } else {
1058            LeaseStatus::Live
1059        }
1060    }
1061
1062    /// Enforce lease validity and map expiry to deterministic escalation errors.
1063    ///
1064    /// # Errors
1065    ///
1066    /// Returns:
1067    /// - `FrankenError::Busy` for `Cancel`,
1068    /// - `FrankenError::BusyRecovery` for `Retry`,
1069    /// - `FrankenError::LockFailed` for `Fail`.
1070    pub fn enforce(&self, now_millis: u64, trace: &TraceContext) -> Result<()> {
1071        match self.evaluate(now_millis) {
1072            LeaseStatus::Live => Ok(()),
1073            LeaseStatus::Expired { escalation } => {
1074                warn!(
1075                    bead_id = BEAD_ID,
1076                    trace_id = trace.trace_id,
1077                    lease_id = self.lease_id,
1078                    effect_name = "lease_expiry",
1079                    saga_id = format_saga(trace.saga_id),
1080                    idempotency_key = format_key(trace.idempotency_key),
1081                    attempt = trace.attempt,
1082                    ecs_epoch = trace.ecs_epoch,
1083                    escalation = ?escalation,
1084                    "remote lease expired; escalating"
1085                );
1086                match escalation {
1087                    LeaseEscalation::Cancel => Err(FrankenError::Busy),
1088                    LeaseEscalation::Retry => Err(FrankenError::BusyRecovery),
1089                    LeaseEscalation::Fail => Err(FrankenError::LockFailed {
1090                        detail: "remote lease expired".to_owned(),
1091                    }),
1092                }
1093            }
1094        }
1095    }
1096}
1097
1098/// Local deterministic remote segment store for tests.
1099#[derive(Debug, Default)]
1100pub struct InMemoryRemoteStore {
1101    segments: HashMap<ObjectId, Vec<u8>>,
1102    uploads: HashMap<IdempotencyKey, UploadRecord>,
1103    upload_count: HashMap<ObjectId, u64>,
1104}
1105
1106#[derive(Debug, Clone)]
1107struct UploadRecord {
1108    segment_id: ObjectId,
1109    payload_digest: [u8; 32],
1110}
1111
1112impl InMemoryRemoteStore {
1113    #[must_use]
1114    pub fn new() -> Self {
1115        Self::default()
1116    }
1117
1118    /// Idempotent segment upload keyed by idempotency key.
1119    ///
1120    /// # Errors
1121    ///
1122    /// Returns `FrankenError::Internal` when an existing idempotency key is
1123    /// reused with a different segment/payload.
1124    pub fn put_segment(
1125        &mut self,
1126        segment_id: ObjectId,
1127        payload: &[u8],
1128        key: IdempotencyKey,
1129    ) -> Result<()> {
1130        let digest = request_digest(payload);
1131        if let Some(existing) = self.uploads.get(&key) {
1132            if existing.segment_id == segment_id && existing.payload_digest == digest {
1133                // Preserve idempotency while ensuring deterministic replay can
1134                // reconstruct remote-visible state after compensation cleanup.
1135                self.segments
1136                    .entry(segment_id)
1137                    .or_insert_with(|| payload.to_vec());
1138                return Ok(());
1139            }
1140            return Err(FrankenError::Internal(
1141                "remote put conflict: idempotency key reused with different payload".to_owned(),
1142            ));
1143        }
1144
1145        self.uploads.insert(
1146            key,
1147            UploadRecord {
1148                segment_id,
1149                payload_digest: digest,
1150            },
1151        );
1152        self.segments.insert(segment_id, payload.to_vec());
1153        *self.upload_count.entry(segment_id).or_insert(0) += 1;
1154        Ok(())
1155    }
1156
1157    #[must_use]
1158    pub fn has_segment(&self, segment_id: ObjectId) -> bool {
1159        self.segments.contains_key(&segment_id)
1160    }
1161
1162    #[must_use]
1163    pub fn upload_count(&self, segment_id: ObjectId) -> u64 {
1164        *self.upload_count.get(&segment_id).unwrap_or(&0)
1165    }
1166
1167    pub fn remove_segment(&mut self, segment_id: ObjectId) -> bool {
1168        self.segments.remove(&segment_id).is_some()
1169    }
1170}
1171
1172/// Eviction saga phase.
1173#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1174pub enum EvictionPhase {
1175    Init,
1176    Uploaded,
1177    Verified,
1178    Retired,
1179    Cancelled,
1180}
1181
1182/// Local segment state during eviction.
1183#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1184pub enum LocalSegmentState {
1185    Present,
1186    Retired,
1187}
1188
1189/// Compensation outcome when cancelling an eviction saga.
1190#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1191pub enum EvictionCompensation {
1192    LocalRetained,
1193    RollbackRequired,
1194}
1195
1196/// L2->L3 eviction saga skeleton (`upload -> verify -> retire`).
1197#[derive(Debug)]
1198pub struct EvictionSaga {
1199    saga: Saga,
1200    segment_id: ObjectId,
1201    phase: EvictionPhase,
1202    local_state: LocalSegmentState,
1203    upload_idempotency_key: IdempotencyKey,
1204}
1205
1206impl EvictionSaga {
1207    #[must_use]
1208    pub fn new(saga: Saga, segment_id: ObjectId) -> Self {
1209        Self {
1210            upload_idempotency_key: derive_step_key(saga.key(), segment_id, b"segment_put"),
1211            saga,
1212            segment_id,
1213            phase: EvictionPhase::Init,
1214            local_state: LocalSegmentState::Present,
1215        }
1216    }
1217
1218    #[must_use]
1219    pub const fn phase(&self) -> EvictionPhase {
1220        self.phase
1221    }
1222
1223    #[must_use]
1224    pub const fn local_state(&self) -> LocalSegmentState {
1225        self.local_state
1226    }
1227
1228    #[must_use]
1229    pub const fn upload_idempotency_key(&self) -> IdempotencyKey {
1230        self.upload_idempotency_key
1231    }
1232
1233    /// Upload step (`segment_put`).
1234    ///
1235    /// # Errors
1236    ///
1237    /// Returns `FrankenError::Internal` when called from an invalid phase or if
1238    /// remote idempotency validation fails.
1239    pub fn upload(&mut self, remote: &mut InMemoryRemoteStore, bytes: &[u8]) -> Result<()> {
1240        if !matches!(self.phase, EvictionPhase::Init | EvictionPhase::Cancelled) {
1241            return Err(FrankenError::Internal(format!(
1242                "eviction upload invalid in phase {:?}",
1243                self.phase
1244            )));
1245        }
1246        remote.put_segment(self.segment_id, bytes, self.upload_idempotency_key)?;
1247        self.phase = EvictionPhase::Uploaded;
1248        Ok(())
1249    }
1250
1251    /// Verify step (`segment_stat`).
1252    ///
1253    /// # Errors
1254    ///
1255    /// Returns `FrankenError::Internal` when called from an invalid phase or if
1256    /// the segment is not present remotely.
1257    pub fn verify_remote(&mut self, remote: &InMemoryRemoteStore) -> Result<()> {
1258        if self.phase != EvictionPhase::Uploaded {
1259            return Err(FrankenError::Internal(format!(
1260                "eviction verify invalid in phase {:?}",
1261                self.phase
1262            )));
1263        }
1264        if !remote.has_segment(self.segment_id) {
1265            return Err(FrankenError::Internal(
1266                "segment verification failed: missing in remote store".to_owned(),
1267            ));
1268        }
1269        self.phase = EvictionPhase::Verified;
1270        Ok(())
1271    }
1272
1273    /// Retire local segment after remote verification.
1274    ///
1275    /// # Errors
1276    ///
1277    /// Returns `FrankenError::Internal` when called from an invalid phase.
1278    pub fn retire_local(&mut self) -> Result<()> {
1279        if self.phase != EvictionPhase::Verified {
1280            return Err(FrankenError::Internal(format!(
1281                "eviction retire invalid in phase {:?}",
1282                self.phase
1283            )));
1284        }
1285        self.local_state = LocalSegmentState::Retired;
1286        self.phase = EvictionPhase::Retired;
1287        Ok(())
1288    }
1289
1290    /// Cancel saga; before retire we retain local data for safe replay.
1291    #[must_use]
1292    pub fn cancel(&mut self) -> EvictionCompensation {
1293        if self.phase == EvictionPhase::Retired {
1294            EvictionCompensation::RollbackRequired
1295        } else {
1296            self.phase = EvictionPhase::Cancelled;
1297            self.local_state = LocalSegmentState::Present;
1298            debug!(
1299                bead_id = BEAD_ID,
1300                saga_id = format_key(Some(self.saga.key())),
1301                "eviction saga cancelled; local segment retained"
1302            );
1303            EvictionCompensation::LocalRetained
1304        }
1305    }
1306}
1307
1308/// Compaction publish saga phase (`write segments -> publish -> update locator`).
1309#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1310pub enum CompactionPhase {
1311    Init,
1312    SegmentsStaged,
1313    Published,
1314    LocatorUpdated,
1315    Cancelled,
1316}
1317
1318/// Compensation outcome when cancelling compaction publication.
1319#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1320pub enum CompactionCompensation {
1321    RemoteCleaned,
1322    RollbackRequired,
1323}
1324
1325/// Compaction publication saga skeleton with deterministic compensation.
1326#[derive(Debug)]
1327pub struct CompactionPublishSaga {
1328    saga: Saga,
1329    manifest_id: ObjectId,
1330    staged_segments: Vec<ObjectId>,
1331    phase: CompactionPhase,
1332    locator_updated: bool,
1333}
1334
1335impl CompactionPublishSaga {
1336    #[must_use]
1337    pub fn new(saga: Saga, manifest_id: ObjectId) -> Self {
1338        Self {
1339            saga,
1340            manifest_id,
1341            staged_segments: Vec::new(),
1342            phase: CompactionPhase::Init,
1343            locator_updated: false,
1344        }
1345    }
1346
1347    #[must_use]
1348    pub const fn phase(&self) -> CompactionPhase {
1349        self.phase
1350    }
1351
1352    #[must_use]
1353    pub const fn locator_updated(&self) -> bool {
1354        self.locator_updated
1355    }
1356
1357    /// Stage replacement segments for compaction publication.
1358    ///
1359    /// # Errors
1360    ///
1361    /// Returns `FrankenError::OutOfRange` when `segments` is empty, or
1362    /// `FrankenError::Internal` when called from an invalid phase.
1363    pub fn stage_segments(
1364        &mut self,
1365        remote: &mut InMemoryRemoteStore,
1366        segments: &[(ObjectId, Vec<u8>)],
1367    ) -> Result<()> {
1368        if !matches!(
1369            self.phase,
1370            CompactionPhase::Init | CompactionPhase::Cancelled
1371        ) {
1372            return Err(FrankenError::Internal(format!(
1373                "compaction stage invalid in phase {:?}",
1374                self.phase
1375            )));
1376        }
1377        if segments.is_empty() {
1378            return Err(FrankenError::OutOfRange {
1379                what: "compaction_segments".to_owned(),
1380                value: "0".to_owned(),
1381            });
1382        }
1383
1384        self.staged_segments.clear();
1385        for (segment_id, payload) in segments {
1386            let key = derive_step_key(self.saga.key(), *segment_id, b"compaction_segment_put");
1387            remote.put_segment(*segment_id, payload, key)?;
1388            self.staged_segments.push(*segment_id);
1389        }
1390        self.phase = CompactionPhase::SegmentsStaged;
1391        self.locator_updated = false;
1392        Ok(())
1393    }
1394
1395    /// Publish a compaction manifest after segment staging.
1396    ///
1397    /// # Errors
1398    ///
1399    /// Returns `FrankenError::Internal` when called from an invalid phase.
1400    pub fn publish_manifest(
1401        &mut self,
1402        remote: &mut InMemoryRemoteStore,
1403        manifest: &[u8],
1404    ) -> Result<()> {
1405        if self.phase != CompactionPhase::SegmentsStaged {
1406            return Err(FrankenError::Internal(format!(
1407                "compaction publish invalid in phase {:?}",
1408                self.phase
1409            )));
1410        }
1411        let key = derive_step_key(
1412            self.saga.key(),
1413            self.manifest_id,
1414            b"compaction_manifest_publish",
1415        );
1416        remote.put_segment(self.manifest_id, manifest, key)?;
1417        self.phase = CompactionPhase::Published;
1418        Ok(())
1419    }
1420
1421    /// Update locators/manifests to point at the newly published compaction output.
1422    ///
1423    /// # Errors
1424    ///
1425    /// Returns `FrankenError::Internal` when called from an invalid phase.
1426    pub fn update_locator(&mut self) -> Result<()> {
1427        if self.phase != CompactionPhase::Published {
1428            return Err(FrankenError::Internal(format!(
1429                "compaction locator update invalid in phase {:?}",
1430                self.phase
1431            )));
1432        }
1433        self.locator_updated = true;
1434        self.phase = CompactionPhase::LocatorUpdated;
1435        Ok(())
1436    }
1437
1438    /// Cancel compaction publication.
1439    ///
1440    /// Before locator update, deterministic compensation removes staged remote
1441    /// objects and leaves local locator state unchanged.
1442    #[must_use]
1443    pub fn cancel(&mut self, remote: &mut InMemoryRemoteStore) -> CompactionCompensation {
1444        match self.phase {
1445            CompactionPhase::LocatorUpdated => CompactionCompensation::RollbackRequired,
1446            CompactionPhase::Init | CompactionPhase::Cancelled => {
1447                self.phase = CompactionPhase::Cancelled;
1448                self.locator_updated = false;
1449                CompactionCompensation::RemoteCleaned
1450            }
1451            CompactionPhase::SegmentsStaged | CompactionPhase::Published => {
1452                for segment in &self.staged_segments {
1453                    let _ = remote.remove_segment(*segment);
1454                }
1455                let _ = remote.remove_segment(self.manifest_id);
1456                self.phase = CompactionPhase::Cancelled;
1457                self.locator_updated = false;
1458                debug!(
1459                    bead_id = BEAD_ID,
1460                    saga_id = format_key(Some(self.saga.key())),
1461                    "compaction saga cancelled; remote staged objects cleaned"
1462                );
1463                CompactionCompensation::RemoteCleaned
1464            }
1465        }
1466    }
1467}
1468
1469#[must_use]
1470fn derive_step_key(
1471    base_key: IdempotencyKey,
1472    object_id: ObjectId,
1473    step_tag: &[u8],
1474) -> IdempotencyKey {
1475    let mut bytes = Vec::with_capacity(16 + 16 + step_tag.len());
1476    bytes.extend_from_slice(base_key.as_bytes());
1477    bytes.extend_from_slice(object_id.as_bytes());
1478    bytes.extend_from_slice(step_tag);
1479    derive_idempotency_key(&bytes)
1480}
1481
1482#[must_use]
1483fn format_key(key: Option<IdempotencyKey>) -> String {
1484    key.map_or_else(|| "-".to_owned(), |k| hex16(k.as_bytes()))
1485}
1486
1487#[must_use]
1488fn format_saga(saga: Option<Saga>) -> String {
1489    saga.map_or_else(|| "-".to_owned(), |s| hex16(s.key().as_bytes()))
1490}
1491
1492#[must_use]
1493fn hex16(bytes: &[u8; 16]) -> String {
1494    use std::fmt::Write;
1495    let mut out = String::with_capacity(32);
1496    for byte in bytes {
1497        let _ = write!(out, "{byte:02x}");
1498    }
1499    out
1500}
1501
1502#[cfg(not(feature = "native"))]
1503#[derive(Debug)]
1504struct FallbackQueueReservation<'a> {
1505    queued_waiters: &'a AtomicUsize,
1506    released: bool,
1507}
1508
1509#[cfg(not(feature = "native"))]
1510impl FallbackQueueReservation<'_> {
1511    fn new(queued_waiters: &AtomicUsize) -> FallbackQueueReservation<'_> {
1512        FallbackQueueReservation {
1513            queued_waiters,
1514            released: false,
1515        }
1516    }
1517
1518    fn release(mut self) {
1519        if !self.released {
1520            self.queued_waiters.fetch_sub(1, Ordering::AcqRel);
1521            self.released = true;
1522        }
1523    }
1524}
1525
1526#[cfg(not(feature = "native"))]
1527impl Drop for FallbackQueueReservation<'_> {
1528    fn drop(&mut self) {
1529        if !self.released {
1530            self.queued_waiters.fetch_sub(1, Ordering::AcqRel);
1531            self.released = true;
1532        }
1533    }
1534}
1535
1536#[cfg(test)]
1537mod tests {
1538    use std::sync::Arc;
1539    use std::sync::atomic::{AtomicUsize, Ordering};
1540    use std::thread;
1541    use std::time::Duration;
1542    #[cfg(not(feature = "native"))]
1543    use std::time::Instant;
1544
1545    use super::*;
1546
1547    fn cap_token(seed: u8) -> RemoteCap {
1548        RemoteCap::from_bytes([seed; 16])
1549    }
1550
1551    fn segment_id(seed: u8) -> ObjectId {
1552        ObjectId::from_bytes([seed; 16])
1553    }
1554
1555    #[cfg(not(feature = "native"))]
1556    fn wait_for(condition: impl Fn() -> bool) {
1557        let deadline = Instant::now() + Duration::from_millis(250);
1558        while Instant::now() < deadline {
1559            if condition() {
1560                return;
1561            }
1562            thread::sleep(Duration::from_millis(1));
1563        }
1564        assert!(condition(), "timed out waiting for condition");
1565    }
1566
1567    #[test]
1568    fn test_remote_cap_required_for_network_io() {
1569        let cx = Cx::<cap::All>::new();
1570        let registry = ComputationRegistry::default();
1571        let executor = Executor::with_max_in_flight(1).unwrap();
1572        let computation = NamedComputation::new(ComputationName::SegmentStat, vec![1, 2, 3]);
1573        let trace = TraceContext::default();
1574
1575        let err = executor
1576            .execute(&cx, None, &registry, &computation, &trace, || {
1577                Ok(vec![0xAA])
1578            })
1579            .unwrap_err();
1580
1581        assert!(matches!(err, FrankenError::Internal(_)));
1582    }
1583
1584    #[test]
1585    fn test_remote_cap_omitted_in_lab_fails_gracefully() {
1586        let cx = Cx::<cap::All>::new();
1587        let registry = ComputationRegistry::default();
1588        let executor = Executor::with_max_in_flight(1).unwrap();
1589        let computation = NamedComputation::new(ComputationName::SegmentStat, vec![0xAA]);
1590        let trace = TraceContext {
1591            trace_id: "lab-no-remote".to_owned(),
1592            lab_seed: Some(17),
1593            schedule_fingerprint: Some("sched-A".to_owned()),
1594            ..TraceContext::default()
1595        };
1596
1597        let err = executor
1598            .execute(&cx, None, &registry, &computation, &trace, || {
1599                Ok(vec![0xBB])
1600            })
1601            .unwrap_err();
1602        assert!(matches!(err, FrankenError::Internal(_)));
1603    }
1604
1605    #[test]
1606    fn test_named_computation_registry_and_unregistered_rejection() {
1607        let mut registry = ComputationRegistry::new_empty();
1608        registry.register(ComputationName::SymbolGetRange);
1609        registry.register(ComputationName::SymbolPutBatch);
1610        registry.register(ComputationName::SegmentPut);
1611        registry.register(ComputationName::SegmentStat);
1612
1613        assert!(registry.validate(&ComputationName::SegmentPut).is_ok());
1614        assert!(
1615            registry
1616                .validate(&ComputationName::Custom("unregistered".to_owned()))
1617                .is_err()
1618        );
1619    }
1620
1621    #[test]
1622    fn test_named_computation_no_closure_shipping_canonical_bytes_deterministic() {
1623        let computation = NamedComputation::new(
1624            ComputationName::SymbolGetRange,
1625            b"obj=01;esi=0..7;epoch=2".to_vec(),
1626        );
1627        let bytes_a = computation.canonical_request_bytes().unwrap();
1628        let bytes_b = computation.canonical_request_bytes().unwrap();
1629        assert_eq!(bytes_a, bytes_b);
1630        let domain = REMOTE_IDEMPOTENCY_DOMAIN.as_bytes();
1631        assert!(bytes_a.windows(domain.len()).any(|window| window == domain));
1632    }
1633
1634    #[test]
1635    fn test_idempotency_key_deterministic() {
1636        let request = b"segment_put:abc";
1637        let key_a = derive_idempotency_key(request);
1638        let key_b = derive_idempotency_key(request);
1639        assert_eq!(key_a, key_b);
1640    }
1641
1642    #[test]
1643    fn test_idempotency_dedup_same_key_same_input() {
1644        let store = IdempotencyStore::new();
1645        let computation = ComputationName::SegmentPut;
1646        let request = b"segment_put:id=1";
1647        let key = derive_idempotency_key(request);
1648
1649        let first = store
1650            .register_or_replay(key, &computation, request, b"ok:first")
1651            .unwrap();
1652        let second = store
1653            .register_or_replay(key, &computation, request, b"ok:second")
1654            .unwrap();
1655
1656        assert!(matches!(first, IdempotencyDecision::StoredNew(_)));
1657        assert_eq!(second, IdempotencyDecision::Replayed(b"ok:first".to_vec()));
1658    }
1659
1660    #[test]
1661    fn test_idempotency_conflict_same_key_different_input() {
1662        let store = IdempotencyStore::new();
1663        let computation = ComputationName::SegmentPut;
1664        let first_request = b"segment_put:id=1";
1665        let second_request = b"segment_put:id=2";
1666        let key = derive_idempotency_key(first_request);
1667
1668        let _ = store
1669            .register_or_replay(key, &computation, first_request, b"ok:first")
1670            .unwrap();
1671
1672        let err = store
1673            .register_or_replay(key, &computation, second_request, b"ok:second")
1674            .unwrap_err();
1675        assert!(matches!(err, FrankenError::Internal(_)));
1676    }
1677
1678    #[test]
1679    fn test_lease_backed_liveness_expiry() {
1680        let trace = TraceContext {
1681            trace_id: "trace-lease".to_owned(),
1682            attempt: 1,
1683            ecs_epoch: 7,
1684            ..TraceContext::default()
1685        };
1686        let handle = LeaseBackedHandle::new(42, 1_000, 100, LeaseEscalation::Retry).unwrap();
1687        let status = handle.evaluate(1_200);
1688        assert_eq!(
1689            status,
1690            LeaseStatus::Expired {
1691                escalation: LeaseEscalation::Retry
1692            }
1693        );
1694        let err = handle.enforce(1_200, &trace).unwrap_err();
1695        assert!(matches!(err, FrankenError::BusyRecovery));
1696    }
1697
1698    #[test]
1699    fn test_e2e_remote_effects_saga_eviction_idempotent_restart() {
1700        let saga_key = derive_idempotency_key(b"saga:evict:segment-9");
1701        let saga_id = Saga::new(saga_key);
1702        let target_segment = segment_id(9);
1703        let payload = b"segment payload".to_vec();
1704
1705        let mut remote = InMemoryRemoteStore::new();
1706
1707        let mut first = EvictionSaga::new(saga_id, target_segment);
1708        first.upload(&mut remote, &payload).unwrap();
1709        let compensation = first.cancel();
1710        assert_eq!(compensation, EvictionCompensation::LocalRetained);
1711        assert_eq!(first.local_state(), LocalSegmentState::Present);
1712
1713        let mut restart = EvictionSaga::new(saga_id, target_segment);
1714        restart.upload(&mut remote, &payload).unwrap();
1715        restart.verify_remote(&remote).unwrap();
1716        restart.retire_local().unwrap();
1717
1718        assert_eq!(restart.local_state(), LocalSegmentState::Retired);
1719        assert_eq!(remote.upload_count(target_segment), 1);
1720    }
1721
1722    #[test]
1723    fn test_remote_bulkhead_concurrency_cap() {
1724        let executor = Arc::new(Executor::with_max_in_flight(2).unwrap());
1725        let registry = Arc::new(ComputationRegistry::default());
1726        let computation = Arc::new(NamedComputation::new(ComputationName::SegmentStat, vec![1]));
1727
1728        let active = Arc::new(AtomicUsize::new(0));
1729        let peak = Arc::new(AtomicUsize::new(0));
1730        let busy = Arc::new(AtomicUsize::new(0));
1731
1732        let start = Arc::new(std::sync::Barrier::new(5));
1733        let mut workers = Vec::new();
1734        for _ in 0..5 {
1735            let exec = Arc::clone(&executor);
1736            let reg = Arc::clone(&registry);
1737            let comp = Arc::clone(&computation);
1738            let active_ctr = Arc::clone(&active);
1739            let peak_ctr = Arc::clone(&peak);
1740            let busy_ctr = Arc::clone(&busy);
1741            let barrier = Arc::clone(&start);
1742            workers.push(thread::spawn(move || {
1743                let cx = Cx::<cap::All>::new();
1744                let trace = TraceContext::default();
1745                barrier.wait();
1746                let result = exec.execute(&cx, Some(cap_token(7)), &reg, &comp, &trace, || {
1747                    let now = active_ctr.fetch_add(1, Ordering::AcqRel) + 1;
1748                    peak_ctr.fetch_max(now, Ordering::AcqRel);
1749                    thread::sleep(Duration::from_millis(40));
1750                    active_ctr.fetch_sub(1, Ordering::AcqRel);
1751                    Ok(vec![1, 2, 3])
1752                });
1753                if matches!(result, Err(FrankenError::Busy)) {
1754                    busy_ctr.fetch_add(1, Ordering::AcqRel);
1755                }
1756            }));
1757        }
1758        for worker in workers {
1759            worker.join().unwrap();
1760        }
1761
1762        assert!(busy.load(Ordering::Acquire) >= 3);
1763        assert!(peak.load(Ordering::Acquire) <= 2);
1764    }
1765
1766    #[test]
1767    fn test_remote_bulkhead_zero_means_auto() {
1768        let expected = conservative_remote_max_in_flight(available_parallelism_or_one());
1769        let executor = Executor::from_pragma_remote_max_in_flight(0).unwrap();
1770        assert_eq!(executor.max_in_flight(), expected);
1771    }
1772
1773    #[cfg(not(feature = "native"))]
1774    #[test]
1775    fn test_remote_bulkhead_fallback_queue_waits_for_capacity() {
1776        let executor = Arc::new(
1777            Executor::with_limits("fallback.test", 1, 1, Duration::from_millis(100)).unwrap(),
1778        );
1779        let held = executor.try_acquire_for_testing().unwrap();
1780
1781        let exec = Arc::clone(&executor);
1782        let waiter = thread::spawn(move || {
1783            let cx = Cx::<cap::All>::new();
1784            let permit = exec.acquire(&cx, "segment_stat", None, None, 0).unwrap();
1785            drop(permit);
1786        });
1787
1788        wait_for(|| executor.snapshot().queue_depth == 1);
1789        assert_eq!(executor.snapshot().active_permits, 1);
1790
1791        drop(held);
1792        waiter.join().unwrap();
1793
1794        let snapshot = executor.snapshot();
1795        assert_eq!(snapshot.queue_depth, 0);
1796        assert_eq!(snapshot.total_rejected, 0);
1797        assert_eq!(snapshot.total_cancelled, 0);
1798    }
1799
1800    #[cfg(not(feature = "native"))]
1801    #[test]
1802    fn test_remote_bulkhead_fallback_cancellation_releases_queue_slot() {
1803        let executor = Arc::new(
1804            Executor::with_limits("fallback.test", 1, 1, Duration::from_millis(100)).unwrap(),
1805        );
1806        let held = executor.try_acquire_for_testing().unwrap();
1807        let cx = Cx::<cap::All>::new();
1808        let waiter_cx = cx.clone();
1809
1810        let exec = Arc::clone(&executor);
1811        let waiter = thread::spawn(move || {
1812            let err = exec
1813                .acquire(&waiter_cx, "segment_stat", None, None, 0)
1814                .unwrap_err();
1815            assert!(matches!(err, FrankenError::Busy));
1816        });
1817
1818        wait_for(|| executor.snapshot().queue_depth == 1);
1819        cx.cancel();
1820        waiter.join().unwrap();
1821        drop(held);
1822
1823        let snapshot = executor.snapshot();
1824        assert_eq!(snapshot.queue_depth, 0);
1825        assert_eq!(snapshot.total_cancelled, 1);
1826    }
1827
1828    #[test]
1829    fn test_compaction_publish_saga_forward() {
1830        let saga_key = derive_idempotency_key(b"saga:compaction:forward");
1831        let saga_id = Saga::new(saga_key);
1832        let manifest_id = segment_id(99);
1833        let mut remote = InMemoryRemoteStore::new();
1834
1835        let mut saga = CompactionPublishSaga::new(saga_id, manifest_id);
1836        let segments = vec![
1837            (segment_id(11), b"seg-11".to_vec()),
1838            (segment_id(12), b"seg-12".to_vec()),
1839        ];
1840
1841        saga.stage_segments(&mut remote, &segments).unwrap();
1842        saga.publish_manifest(&mut remote, b"manifest-v2").unwrap();
1843        saga.update_locator().unwrap();
1844
1845        assert_eq!(saga.phase(), CompactionPhase::LocatorUpdated);
1846        assert!(saga.locator_updated());
1847        assert!(remote.has_segment(segment_id(11)));
1848        assert!(remote.has_segment(segment_id(12)));
1849        assert!(remote.has_segment(manifest_id));
1850    }
1851
1852    #[test]
1853    fn test_compaction_publish_saga_compensation_then_restart_idempotent() {
1854        let saga_key = derive_idempotency_key(b"saga:compaction:restart");
1855        let saga_id = Saga::new(saga_key);
1856        let manifest_id = segment_id(101);
1857        let seg_a = segment_id(21);
1858        let seg_b = segment_id(22);
1859        let mut remote = InMemoryRemoteStore::new();
1860        let segments = vec![(seg_a, b"seg-21".to_vec()), (seg_b, b"seg-22".to_vec())];
1861
1862        let mut first = CompactionPublishSaga::new(saga_id, manifest_id);
1863        first.stage_segments(&mut remote, &segments).unwrap();
1864        first.publish_manifest(&mut remote, b"manifest-v3").unwrap();
1865        let compensation = first.cancel(&mut remote);
1866        assert_eq!(compensation, CompactionCompensation::RemoteCleaned);
1867        assert!(!remote.has_segment(seg_a));
1868        assert!(!remote.has_segment(seg_b));
1869        assert!(!remote.has_segment(manifest_id));
1870
1871        let mut restart = CompactionPublishSaga::new(saga_id, manifest_id);
1872        restart.stage_segments(&mut remote, &segments).unwrap();
1873        restart
1874            .publish_manifest(&mut remote, b"manifest-v3")
1875            .unwrap();
1876        restart.update_locator().unwrap();
1877
1878        assert_eq!(restart.phase(), CompactionPhase::LocatorUpdated);
1879        assert!(restart.locator_updated());
1880        assert!(remote.has_segment(seg_a));
1881        assert!(remote.has_segment(seg_b));
1882        assert!(remote.has_segment(manifest_id));
1883        assert_eq!(remote.upload_count(seg_a), 1);
1884        assert_eq!(remote.upload_count(seg_b), 1);
1885    }
1886}