1use std::collections::{HashMap, HashSet};
11use std::sync::Mutex;
12#[cfg(not(feature = "native"))]
13use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
14use std::thread;
15use std::time::Duration;
16#[cfg(not(feature = "native"))]
17use std::time::Instant;
18#[cfg(feature = "native")]
19use std::time::{SystemTime, UNIX_EPOCH};
20
21#[cfg(feature = "native")]
22use asupersync::combinator::bulkhead::{
23 Bulkhead as AdmissionBulkhead, BulkheadError as AdmissionBulkheadError,
24 BulkheadMetrics as AdmissionBulkheadMetrics, BulkheadPermit as AdmissionPermit, BulkheadPolicy,
25};
26#[cfg(feature = "native")]
27use asupersync::types::Time as AdmissionTime;
28use blake3::Hasher;
29use fsqlite_error::{FrankenError, Result};
30use fsqlite_types::cx::{Cx, cap};
31use fsqlite_types::{IdempotencyKey, ObjectId, RemoteCap, Saga};
32use tracing::{debug, info, warn};
33
34#[cfg(not(feature = "native"))]
35use crate::Bulkhead as AdmissionBulkhead;
36#[cfg(feature = "native")]
37use crate::available_parallelism_or_one;
38#[cfg(not(feature = "native"))]
39use crate::{
40 BulkheadConfig, BulkheadPermit as AdmissionPermit, OverflowPolicy, available_parallelism_or_one,
41};
42
43const BEAD_ID: &str = "bd-numl";
44const MAX_BALANCED_REMOTE_IN_FLIGHT: usize = 8;
45const REMOTE_EFFECTS_EXECUTOR_NAME: &str = "fsqlite.remote_effects";
46#[allow(dead_code)]
47pub(crate) const TIERED_STORAGE_EXECUTOR_NAME: &str = "fsqlite.tiered_storage";
48#[allow(dead_code)]
49const DEFAULT_TIERED_STORAGE_QUEUE_DEPTH: usize = 1;
50#[allow(dead_code)]
51const DEFAULT_TIERED_STORAGE_QUEUE_TIMEOUT: Duration = Duration::from_millis(250);
52const ADMISSION_POLL_INTERVAL: Duration = Duration::from_millis(1);
53
54pub const REMOTE_IDEMPOTENCY_DOMAIN: &str = "fsqlite:remote:v1";
56
57#[derive(Debug, Clone, PartialEq, Eq, Hash)]
59pub enum ComputationName {
60 SymbolGetRange,
62 SymbolPutBatch,
64 SegmentPut,
66 SegmentStat,
68 Custom(String),
70}
71
72impl ComputationName {
73 #[must_use]
74 pub fn as_str(&self) -> &str {
75 match self {
76 Self::SymbolGetRange => "symbol_get_range",
77 Self::SymbolPutBatch => "symbol_put_batch",
78 Self::SegmentPut => "segment_put",
79 Self::SegmentStat => "segment_stat",
80 Self::Custom(name) => name.as_str(),
81 }
82 }
83
84 #[must_use]
85 fn canonical_tag(&self) -> u8 {
86 match self {
87 Self::SymbolGetRange => 0x01,
88 Self::SymbolPutBatch => 0x02,
89 Self::SegmentPut => 0x03,
90 Self::SegmentStat => 0x04,
91 Self::Custom(_) => 0xFF,
92 }
93 }
94
95 #[must_use]
96 fn canonical_name_bytes(&self) -> Vec<u8> {
97 self.as_str().as_bytes().to_vec()
98 }
99}
100
101#[derive(Debug, Clone, PartialEq, Eq)]
103pub struct NamedComputation {
104 pub name: ComputationName,
105 pub input_bytes: Vec<u8>,
106}
107
108impl NamedComputation {
109 #[must_use]
110 pub const fn new(name: ComputationName, input_bytes: Vec<u8>) -> Self {
111 Self { name, input_bytes }
112 }
113
114 pub fn canonical_request_bytes(&self) -> Result<Vec<u8>> {
123 let domain = REMOTE_IDEMPOTENCY_DOMAIN.as_bytes();
124 let name_bytes = self.name.canonical_name_bytes();
125
126 let domain_len = u32::try_from(domain.len()).map_err(|_| FrankenError::OutOfRange {
127 what: "remote_domain_len".to_owned(),
128 value: domain.len().to_string(),
129 })?;
130 let name_len = u32::try_from(name_bytes.len()).map_err(|_| FrankenError::OutOfRange {
131 what: "computation_name_len".to_owned(),
132 value: name_bytes.len().to_string(),
133 })?;
134 let input_len =
135 u32::try_from(self.input_bytes.len()).map_err(|_| FrankenError::OutOfRange {
136 what: "computation_input_len".to_owned(),
137 value: self.input_bytes.len().to_string(),
138 })?;
139
140 let mut out = Vec::with_capacity(
141 4 + domain.len() + 1 + 4 + name_bytes.len() + 4 + self.input_bytes.len(),
142 );
143 out.extend_from_slice(&domain_len.to_le_bytes());
144 out.extend_from_slice(domain);
145 out.push(self.name.canonical_tag());
146 out.extend_from_slice(&name_len.to_le_bytes());
147 out.extend_from_slice(&name_bytes);
148 out.extend_from_slice(&input_len.to_le_bytes());
149 out.extend_from_slice(&self.input_bytes);
150 Ok(out)
151 }
152}
153
154#[derive(Debug, Clone)]
156pub struct ComputationRegistry {
157 allowed: HashSet<ComputationName>,
158}
159
160impl ComputationRegistry {
161 #[must_use]
162 pub fn new_empty() -> Self {
163 Self {
164 allowed: HashSet::new(),
165 }
166 }
167
168 #[must_use]
169 pub fn with_normative_names() -> Self {
170 let mut registry = Self::new_empty();
171 registry.register(ComputationName::SymbolGetRange);
172 registry.register(ComputationName::SymbolPutBatch);
173 registry.register(ComputationName::SegmentPut);
174 registry.register(ComputationName::SegmentStat);
175 registry
176 }
177
178 pub fn register(&mut self, name: ComputationName) {
179 self.allowed.insert(name);
180 }
181
182 #[must_use]
183 pub fn is_registered(&self, name: &ComputationName) -> bool {
184 self.allowed.contains(name)
185 }
186
187 pub fn validate(&self, name: &ComputationName) -> Result<()> {
193 if self.is_registered(name) {
194 Ok(())
195 } else {
196 Err(FrankenError::Unsupported)
197 }
198 }
199}
200
201impl Default for ComputationRegistry {
202 fn default() -> Self {
203 Self::with_normative_names()
204 }
205}
206
207#[derive(Debug, Clone, Default, PartialEq, Eq)]
209pub struct TraceContext {
210 pub trace_id: String,
211 pub saga_id: Option<Saga>,
212 pub idempotency_key: Option<IdempotencyKey>,
213 pub attempt: u32,
214 pub ecs_epoch: u64,
215 pub lab_seed: Option<u64>,
216 pub schedule_fingerprint: Option<String>,
217}
218
219#[must_use]
222pub fn derive_idempotency_key(request_bytes: &[u8]) -> IdempotencyKey {
223 let mut hasher = Hasher::new();
224 hasher.update(REMOTE_IDEMPOTENCY_DOMAIN.as_bytes());
225 hasher.update(request_bytes);
226 let digest = hasher.finalize();
227 let mut out = [0_u8; 16];
228 out.copy_from_slice(&digest.as_bytes()[..16]);
229 IdempotencyKey::from_bytes(out)
230}
231
232#[must_use]
233fn request_digest(request_bytes: &[u8]) -> [u8; 32] {
234 let mut hasher = Hasher::new();
235 hasher.update(request_bytes);
236 *hasher.finalize().as_bytes()
237}
238
239#[derive(Debug, Clone, PartialEq, Eq)]
241pub enum IdempotencyDecision {
242 StoredNew(Vec<u8>),
243 Replayed(Vec<u8>),
244}
245
246#[derive(Debug, Clone)]
247struct IdempotencyEntry {
248 computation: ComputationName,
249 request_digest: [u8; 32],
250 outcome: Vec<u8>,
251}
252
253#[derive(Debug, Default)]
255pub struct IdempotencyStore {
256 entries: Mutex<HashMap<IdempotencyKey, IdempotencyEntry>>,
257}
258
259impl IdempotencyStore {
260 #[must_use]
261 pub fn new() -> Self {
262 Self::default()
263 }
264
265 pub fn register_or_replay(
272 &self,
273 key: IdempotencyKey,
274 computation: &ComputationName,
275 request_bytes: &[u8],
276 outcome: &[u8],
277 ) -> Result<IdempotencyDecision> {
278 let digest = request_digest(request_bytes);
279 let mut guard = self
280 .entries
281 .lock()
282 .unwrap_or_else(std::sync::PoisonError::into_inner);
283
284 if let Some(existing) = guard.get(&key) {
285 if existing.request_digest == digest && existing.computation == *computation {
286 return Ok(IdempotencyDecision::Replayed(existing.outcome.clone()));
287 }
288 return Err(FrankenError::Internal(
289 "idempotency conflict: same key used for different remote request".to_owned(),
290 ));
291 }
292
293 guard.insert(
294 key,
295 IdempotencyEntry {
296 computation: computation.clone(),
297 request_digest: digest,
298 outcome: outcome.to_vec(),
299 },
300 );
301 drop(guard);
302 Ok(IdempotencyDecision::StoredNew(outcome.to_vec()))
303 }
304}
305
306pub fn require_remote_cap<Caps>(_: &Cx<Caps>, remote_cap: Option<RemoteCap>) -> Result<RemoteCap>
312where
313 Caps: cap::SubsetOf<cap::All> + cap::HasRemote,
314{
315 remote_cap.ok_or_else(|| {
316 FrankenError::Internal("remote capability token missing for remote effect".to_owned())
317 })
318}
319
320#[must_use]
324pub const fn conservative_remote_max_in_flight(parallelism: usize) -> usize {
325 let base = parallelism / 8;
326 if base == 0 {
327 1
328 } else if base > MAX_BALANCED_REMOTE_IN_FLIGHT {
329 MAX_BALANCED_REMOTE_IN_FLIGHT
330 } else {
331 base
332 }
333}
334
335#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
337pub struct AdmissionSnapshot {
338 pub active_permits: usize,
339 pub queue_depth: usize,
340 pub total_rejected: u64,
341 pub total_cancelled: u64,
342}
343
344#[cfg(feature = "native")]
345#[must_use]
346fn admission_now() -> AdmissionTime {
347 let millis = SystemTime::now()
348 .duration_since(UNIX_EPOCH)
349 .unwrap_or_default()
350 .as_millis();
351 let millis = u64::try_from(millis).unwrap_or(u64::MAX);
352 AdmissionTime::from_millis(millis)
353}
354
355#[derive(Debug)]
357pub struct Executor {
358 name: &'static str,
359 max_in_flight: usize,
360 max_queue: usize,
361 queue_timeout: Duration,
362 bulkhead: AdmissionBulkhead,
363 #[cfg(not(feature = "native"))]
364 queued_waiters: AtomicUsize,
365 #[cfg(not(feature = "native"))]
366 total_rejected: AtomicU64,
367 #[cfg(not(feature = "native"))]
368 total_cancelled: AtomicU64,
369}
370
371impl Executor {
372 pub fn from_pragma_remote_max_in_flight(remote_max_in_flight: usize) -> Result<Self> {
381 if remote_max_in_flight == 0 {
382 Ok(Self::balanced_default())
383 } else {
384 Self::with_max_in_flight(remote_max_in_flight)
385 }
386 }
387
388 pub fn with_max_in_flight(max_in_flight: usize) -> Result<Self> {
394 Self::with_limits(
395 REMOTE_EFFECTS_EXECUTOR_NAME,
396 max_in_flight,
397 0,
398 Duration::ZERO,
399 )
400 }
401
402 #[must_use]
403 pub fn balanced_default() -> Self {
404 let p = available_parallelism_or_one();
405 let max_in_flight = conservative_remote_max_in_flight(p);
406 Self::with_max_in_flight(max_in_flight)
407 .expect("remote balanced max_in_flight is always >= 1")
408 }
409
410 #[must_use]
414 #[allow(dead_code)]
415 pub(crate) fn balanced_tiered_storage_default() -> Self {
416 let p = available_parallelism_or_one();
417 let max_in_flight = conservative_remote_max_in_flight(p);
418 Self::with_limits(
419 TIERED_STORAGE_EXECUTOR_NAME,
420 max_in_flight,
421 DEFAULT_TIERED_STORAGE_QUEUE_DEPTH,
422 DEFAULT_TIERED_STORAGE_QUEUE_TIMEOUT,
423 )
424 .expect("tiered storage balanced max_in_flight is always >= 1")
425 }
426
427 pub(crate) fn with_limits(
434 name: &'static str,
435 max_in_flight: usize,
436 max_queue: usize,
437 queue_timeout: Duration,
438 ) -> Result<Self> {
439 let max_in_flight_u32 =
440 u32::try_from(max_in_flight).map_err(|_| FrankenError::OutOfRange {
441 what: format!("{name}.max_in_flight"),
442 value: max_in_flight.to_string(),
443 })?;
444 if max_in_flight_u32 == 0 {
445 return Err(FrankenError::OutOfRange {
446 what: format!("{name}.max_in_flight"),
447 value: max_in_flight.to_string(),
448 });
449 }
450 #[cfg(feature = "native")]
451 let bulkhead = {
452 let max_queue_u32 = u32::try_from(max_queue).map_err(|_| FrankenError::OutOfRange {
453 what: format!("{name}.max_queue"),
454 value: max_queue.to_string(),
455 })?;
456 AdmissionBulkhead::new(BulkheadPolicy {
457 name: name.to_owned(),
458 max_concurrent: max_in_flight_u32,
459 max_queue: max_queue_u32,
460 queue_timeout,
461 weighted: false,
462 on_full: None,
463 })
464 };
465 #[cfg(not(feature = "native"))]
466 let bulkhead = {
467 let config = BulkheadConfig::new(max_in_flight, 0, OverflowPolicy::DropBusy)
468 .ok_or_else(|| FrankenError::OutOfRange {
469 what: format!("{name}.max_in_flight"),
470 value: max_in_flight.to_string(),
471 })?;
472 AdmissionBulkhead::new(config)
473 };
474
475 Ok(Self {
476 name,
477 max_in_flight,
478 max_queue,
479 queue_timeout,
480 bulkhead,
481 #[cfg(not(feature = "native"))]
482 queued_waiters: AtomicUsize::new(0),
483 #[cfg(not(feature = "native"))]
484 total_rejected: AtomicU64::new(0),
485 #[cfg(not(feature = "native"))]
486 total_cancelled: AtomicU64::new(0),
487 })
488 }
489
490 #[must_use]
491 pub const fn name(&self) -> &'static str {
492 self.name
493 }
494
495 #[must_use]
496 pub const fn max_in_flight(&self) -> usize {
497 self.max_in_flight
498 }
499
500 #[must_use]
501 pub const fn max_queue(&self) -> usize {
502 self.max_queue
503 }
504
505 #[must_use]
506 pub fn snapshot(&self) -> AdmissionSnapshot {
507 #[cfg(feature = "native")]
508 {
509 let metrics: AdmissionBulkheadMetrics = self.bulkhead.metrics();
510 AdmissionSnapshot {
511 #[allow(clippy::cast_possible_truncation)]
512 active_permits: metrics.active_permits as usize,
513 #[allow(clippy::cast_possible_truncation)]
514 queue_depth: metrics.queue_depth as usize,
515 total_rejected: metrics.total_rejected,
516 total_cancelled: metrics.total_cancelled,
517 }
518 }
519 #[cfg(not(feature = "native"))]
520 {
521 AdmissionSnapshot {
522 active_permits: self.bulkhead.in_flight(),
523 queue_depth: self.queued_waiters.load(Ordering::Acquire),
524 total_rejected: self.total_rejected.load(Ordering::Acquire),
525 total_cancelled: self.total_cancelled.load(Ordering::Acquire),
526 }
527 }
528 }
529
530 #[cfg(not(feature = "native"))]
531 fn reserve_fallback_waiter(&self) -> bool {
532 loop {
533 let current = self.queued_waiters.load(Ordering::Acquire);
534 if current >= self.max_queue {
535 return false;
536 }
537 let next = current + 1;
538 if self
539 .queued_waiters
540 .compare_exchange_weak(current, next, Ordering::AcqRel, Ordering::Acquire)
541 .is_ok()
542 {
543 return true;
544 }
545 }
546 }
547
548 #[cfg(test)]
549 #[allow(dead_code)]
550 pub(crate) fn try_acquire_for_testing(&self) -> Option<AdmissionPermit<'_>> {
551 #[cfg(feature = "native")]
552 {
553 self.bulkhead.try_acquire(1)
554 }
555 #[cfg(not(feature = "native"))]
556 {
557 self.bulkhead.try_acquire().ok()
558 }
559 }
560
561 pub(crate) fn run<Caps, T, F>(
562 &self,
563 cx: &Cx<Caps>,
564 effect_name: &str,
565 saga: Option<Saga>,
566 idempotency_key: Option<IdempotencyKey>,
567 ecs_epoch: u64,
568 operation: F,
569 ) -> Result<T>
570 where
571 Caps: cap::SubsetOf<cap::All>,
572 F: FnOnce() -> Result<T>,
573 {
574 let _permit = self.acquire(cx, effect_name, saga, idempotency_key, ecs_epoch)?;
575 cx.checkpoint().map_err(|_| {
576 let snapshot = self.snapshot();
577 warn!(
578 bead_id = BEAD_ID,
579 executor = self.name,
580 effect_name,
581 saga_id = format_saga(saga),
582 idempotency_key = format_key(idempotency_key),
583 ecs_epoch,
584 admission = "cancelled_post_acquire",
585 active_permits = snapshot.active_permits,
586 queue_depth = snapshot.queue_depth,
587 total_cancelled = snapshot.total_cancelled,
588 "remote operation cancelled after admission and before dispatch"
589 );
590 FrankenError::Busy
591 })?;
592 match operation() {
593 Ok(out) => {
594 let snapshot = self.snapshot();
595 info!(
596 bead_id = BEAD_ID,
597 executor = self.name,
598 effect_name,
599 saga_id = format_saga(saga),
600 idempotency_key = format_key(idempotency_key),
601 ecs_epoch,
602 active_permits = snapshot.active_permits,
603 queue_depth = snapshot.queue_depth,
604 "remote operation completed under admission control"
605 );
606 Ok(out)
607 }
608 Err(err) => {
609 let snapshot = self.snapshot();
610 warn!(
611 bead_id = BEAD_ID,
612 executor = self.name,
613 effect_name,
614 saga_id = format_saga(saga),
615 idempotency_key = format_key(idempotency_key),
616 ecs_epoch,
617 active_permits = snapshot.active_permits,
618 queue_depth = snapshot.queue_depth,
619 error = %err,
620 "remote operation failed under admission control"
621 );
622 Err(err)
623 }
624 }
625 }
626
627 fn acquire<Caps>(
628 &self,
629 cx: &Cx<Caps>,
630 effect_name: &str,
631 saga: Option<Saga>,
632 idempotency_key: Option<IdempotencyKey>,
633 ecs_epoch: u64,
634 ) -> Result<AdmissionPermit<'_>>
635 where
636 Caps: cap::SubsetOf<cap::All>,
637 {
638 cx.checkpoint().map_err(|_| FrankenError::Busy)?;
639
640 #[cfg(feature = "native")]
641 {
642 if let Some(permit) = self.bulkhead.try_acquire(1) {
643 let snapshot = self.snapshot();
644 debug!(
645 bead_id = BEAD_ID,
646 executor = self.name,
647 effect_name,
648 saga_id = format_saga(saga),
649 idempotency_key = format_key(idempotency_key),
650 ecs_epoch,
651 admission = "immediate",
652 active_permits = snapshot.active_permits,
653 queue_depth = snapshot.queue_depth,
654 max_in_flight = self.max_in_flight,
655 max_queue = self.max_queue,
656 "remote admission granted immediately"
657 );
658 return Ok(permit);
659 }
660
661 let snapshot = self.snapshot();
662 if self.max_queue == 0 {
663 warn!(
664 bead_id = BEAD_ID,
665 executor = self.name,
666 effect_name,
667 saga_id = format_saga(saga),
668 idempotency_key = format_key(idempotency_key),
669 ecs_epoch,
670 admission = "rejected",
671 active_permits = snapshot.active_permits,
672 queue_depth = snapshot.queue_depth,
673 total_rejected = snapshot.total_rejected,
674 "remote admission saturated"
675 );
676 return Err(FrankenError::Busy);
677 }
678
679 let queued_at = admission_now();
680 let entry_id = match self.bulkhead.enqueue(1, queued_at) {
681 Ok(entry_id) => entry_id,
682 Err(AdmissionBulkheadError::Full | AdmissionBulkheadError::QueueFull) => {
683 let snapshot = self.snapshot();
684 warn!(
685 bead_id = BEAD_ID,
686 executor = self.name,
687 effect_name,
688 saga_id = format_saga(saga),
689 idempotency_key = format_key(idempotency_key),
690 ecs_epoch,
691 admission = "rejected",
692 active_permits = snapshot.active_permits,
693 queue_depth = snapshot.queue_depth,
694 total_rejected = snapshot.total_rejected,
695 "remote admission queue full"
696 );
697 return Err(FrankenError::Busy);
698 }
699 Err(
700 AdmissionBulkheadError::QueueTimeout { .. } | AdmissionBulkheadError::Cancelled,
701 ) => {
702 return Err(FrankenError::Busy);
703 }
704 Err(AdmissionBulkheadError::Inner(())) => unreachable!(),
705 };
706
707 let snapshot = self.snapshot();
708 debug!(
709 bead_id = BEAD_ID,
710 executor = self.name,
711 effect_name,
712 saga_id = format_saga(saga),
713 idempotency_key = format_key(idempotency_key),
714 ecs_epoch,
715 admission = "queued",
716 entry_id,
717 queue_depth = snapshot.queue_depth,
718 queue_timeout_ms = self.queue_timeout.as_millis(),
719 "remote admission queued"
720 );
721
722 loop {
723 if cx.checkpoint().is_err() {
724 self.bulkhead.cancel_entry(entry_id, AdmissionTime::ZERO);
725 let snapshot = self.snapshot();
726 warn!(
727 bead_id = BEAD_ID,
728 executor = self.name,
729 effect_name,
730 saga_id = format_saga(saga),
731 idempotency_key = format_key(idempotency_key),
732 ecs_epoch,
733 admission = "cancelled",
734 entry_id,
735 queue_depth = snapshot.queue_depth,
736 total_cancelled = snapshot.total_cancelled,
737 "remote admission cancelled while waiting"
738 );
739 return Err(FrankenError::Busy);
740 }
741
742 let now = admission_now();
743 match self.bulkhead.check_entry(entry_id, now) {
744 Ok(Some(permit)) => {
745 let waited_ms = now.as_millis().saturating_sub(queued_at.as_millis());
746 let snapshot = self.snapshot();
747 debug!(
748 bead_id = BEAD_ID,
749 executor = self.name,
750 effect_name,
751 saga_id = format_saga(saga),
752 idempotency_key = format_key(idempotency_key),
753 ecs_epoch,
754 admission = "dequeued",
755 entry_id,
756 waited_ms,
757 active_permits = snapshot.active_permits,
758 queue_depth = snapshot.queue_depth,
759 "remote admission granted from queue"
760 );
761 return Ok(permit);
762 }
763 Ok(None) => thread::sleep(ADMISSION_POLL_INTERVAL),
764 Err(AdmissionBulkheadError::QueueTimeout { waited }) => {
765 let snapshot = self.snapshot();
766 warn!(
767 bead_id = BEAD_ID,
768 executor = self.name,
769 effect_name,
770 saga_id = format_saga(saga),
771 idempotency_key = format_key(idempotency_key),
772 ecs_epoch,
773 admission = "timed_out",
774 entry_id,
775 waited_ms = waited.as_millis(),
776 queue_depth = snapshot.queue_depth,
777 total_rejected = snapshot.total_rejected,
778 "remote admission queue timeout"
779 );
780 return Err(FrankenError::Busy);
781 }
782 Err(
783 AdmissionBulkheadError::Cancelled
784 | AdmissionBulkheadError::Full
785 | AdmissionBulkheadError::QueueFull,
786 ) => {
787 return Err(FrankenError::Busy);
788 }
789 Err(AdmissionBulkheadError::Inner(())) => unreachable!(),
790 }
791 }
792 }
793
794 #[cfg(not(feature = "native"))]
795 {
796 if let Ok(permit) = self.bulkhead.try_acquire() {
797 let snapshot = self.snapshot();
798 debug!(
799 bead_id = BEAD_ID,
800 executor = self.name,
801 effect_name,
802 saga_id = format_saga(saga),
803 idempotency_key = format_key(idempotency_key),
804 ecs_epoch,
805 admission = "immediate",
806 active_permits = snapshot.active_permits,
807 queue_depth = snapshot.queue_depth,
808 max_in_flight = self.max_in_flight,
809 max_queue = self.max_queue,
810 "remote admission granted via local fallback bulkhead"
811 );
812 return Ok(permit);
813 }
814
815 if self.max_queue == 0 {
816 self.total_rejected.fetch_add(1, Ordering::AcqRel);
817 let snapshot = self.snapshot();
818 warn!(
819 bead_id = BEAD_ID,
820 executor = self.name,
821 effect_name,
822 saga_id = format_saga(saga),
823 idempotency_key = format_key(idempotency_key),
824 ecs_epoch,
825 admission = "rejected",
826 active_permits = snapshot.active_permits,
827 queue_depth = snapshot.queue_depth,
828 total_rejected = snapshot.total_rejected,
829 "remote admission saturated"
830 );
831 return Err(FrankenError::Busy);
832 }
833
834 if !self.reserve_fallback_waiter() {
835 self.total_rejected.fetch_add(1, Ordering::AcqRel);
836 let snapshot = self.snapshot();
837 warn!(
838 bead_id = BEAD_ID,
839 executor = self.name,
840 effect_name,
841 saga_id = format_saga(saga),
842 idempotency_key = format_key(idempotency_key),
843 ecs_epoch,
844 admission = "rejected",
845 active_permits = snapshot.active_permits,
846 queue_depth = snapshot.queue_depth,
847 total_rejected = snapshot.total_rejected,
848 "remote admission queue full"
849 );
850 return Err(FrankenError::Busy);
851 }
852
853 let queued_at = Instant::now();
854 let queued = FallbackQueueReservation::new(&self.queued_waiters);
855 let snapshot = self.snapshot();
856 debug!(
857 bead_id = BEAD_ID,
858 executor = self.name,
859 effect_name,
860 saga_id = format_saga(saga),
861 idempotency_key = format_key(idempotency_key),
862 ecs_epoch,
863 admission = "queued",
864 queue_depth = snapshot.queue_depth,
865 queue_timeout_ms = self.queue_timeout.as_millis(),
866 "remote admission queued via local fallback bulkhead"
867 );
868
869 loop {
870 if cx.checkpoint().is_err() {
871 self.total_cancelled.fetch_add(1, Ordering::AcqRel);
872 queued.release();
873 let snapshot = self.snapshot();
874 warn!(
875 bead_id = BEAD_ID,
876 executor = self.name,
877 effect_name,
878 saga_id = format_saga(saga),
879 idempotency_key = format_key(idempotency_key),
880 ecs_epoch,
881 admission = "cancelled",
882 queue_depth = snapshot.queue_depth,
883 total_cancelled = snapshot.total_cancelled,
884 "remote admission cancelled while waiting"
885 );
886 return Err(FrankenError::Busy);
887 }
888
889 match self.bulkhead.try_acquire() {
890 Ok(permit) => {
891 let waited_ms =
892 u64::try_from(queued_at.elapsed().as_millis()).unwrap_or(u64::MAX);
893 queued.release();
894 let snapshot = self.snapshot();
895 debug!(
896 bead_id = BEAD_ID,
897 executor = self.name,
898 effect_name,
899 saga_id = format_saga(saga),
900 idempotency_key = format_key(idempotency_key),
901 ecs_epoch,
902 admission = "dequeued",
903 waited_ms,
904 active_permits = snapshot.active_permits,
905 queue_depth = snapshot.queue_depth,
906 "remote admission granted from local fallback queue"
907 );
908 return Ok(permit);
909 }
910 Err(FrankenError::Busy) => {
911 if queued_at.elapsed() >= self.queue_timeout {
912 self.total_rejected.fetch_add(1, Ordering::AcqRel);
913 let waited_ms =
914 u64::try_from(queued_at.elapsed().as_millis()).unwrap_or(u64::MAX);
915 queued.release();
916 let snapshot = self.snapshot();
917 warn!(
918 bead_id = BEAD_ID,
919 executor = self.name,
920 effect_name,
921 saga_id = format_saga(saga),
922 idempotency_key = format_key(idempotency_key),
923 ecs_epoch,
924 admission = "timed_out",
925 waited_ms,
926 queue_depth = snapshot.queue_depth,
927 total_rejected = snapshot.total_rejected,
928 "remote admission queue timeout"
929 );
930 return Err(FrankenError::Busy);
931 }
932 thread::sleep(ADMISSION_POLL_INTERVAL);
933 }
934 Err(err) => return Err(err),
935 }
936 }
937 }
938 }
939
940 pub fn execute<Caps, F>(
950 &self,
951 cx: &Cx<Caps>,
952 remote_cap: Option<RemoteCap>,
953 registry: &ComputationRegistry,
954 computation: &NamedComputation,
955 trace: &TraceContext,
956 operation: F,
957 ) -> Result<Vec<u8>>
958 where
959 Caps: cap::SubsetOf<cap::All> + cap::HasRemote,
960 F: FnOnce() -> Result<Vec<u8>>,
961 {
962 let _cap = require_remote_cap(cx, remote_cap)?;
963 registry.validate(&computation.name)?;
964 debug!(
965 bead_id = BEAD_ID,
966 trace_id = trace.trace_id,
967 effect_name = computation.name.as_str(),
968 saga_id = format_saga(trace.saga_id),
969 idempotency_key = format_key(trace.idempotency_key),
970 attempt = trace.attempt,
971 ecs_epoch = trace.ecs_epoch,
972 lab_seed = ?trace.lab_seed,
973 schedule_fingerprint = ?trace.schedule_fingerprint,
974 "dispatching named remote computation"
975 );
976 let out = self.run(
977 cx,
978 computation.name.as_str(),
979 trace.saga_id,
980 trace.idempotency_key,
981 trace.ecs_epoch,
982 operation,
983 )?;
984
985 info!(
986 bead_id = BEAD_ID,
987 trace_id = trace.trace_id,
988 effect_name = computation.name.as_str(),
989 saga_id = format_saga(trace.saga_id),
990 idempotency_key = format_key(trace.idempotency_key),
991 attempt = trace.attempt,
992 ecs_epoch = trace.ecs_epoch,
993 "remote computation completed"
994 );
995
996 Ok(out)
997 }
998}
999
1000#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1002pub enum LeaseEscalation {
1003 Cancel,
1004 Retry,
1005 Fail,
1006}
1007
1008#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1010pub enum LeaseStatus {
1011 Live,
1012 Expired { escalation: LeaseEscalation },
1013}
1014
1015#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1017pub struct LeaseBackedHandle {
1018 pub lease_id: u64,
1019 pub issued_at_millis: u64,
1020 pub ttl_millis: u64,
1021 pub escalation: LeaseEscalation,
1022}
1023
1024impl LeaseBackedHandle {
1025 pub fn new(
1031 lease_id: u64,
1032 issued_at_millis: u64,
1033 ttl_millis: u64,
1034 escalation: LeaseEscalation,
1035 ) -> Result<Self> {
1036 if ttl_millis == 0 {
1037 return Err(FrankenError::OutOfRange {
1038 what: "lease_ttl_millis".to_owned(),
1039 value: "0".to_owned(),
1040 });
1041 }
1042 Ok(Self {
1043 lease_id,
1044 issued_at_millis,
1045 ttl_millis,
1046 escalation,
1047 })
1048 }
1049
1050 #[must_use]
1051 pub fn evaluate(&self, now_millis: u64) -> LeaseStatus {
1052 let age_millis = now_millis.saturating_sub(self.issued_at_millis);
1053 if age_millis >= self.ttl_millis {
1054 LeaseStatus::Expired {
1055 escalation: self.escalation,
1056 }
1057 } else {
1058 LeaseStatus::Live
1059 }
1060 }
1061
1062 pub fn enforce(&self, now_millis: u64, trace: &TraceContext) -> Result<()> {
1071 match self.evaluate(now_millis) {
1072 LeaseStatus::Live => Ok(()),
1073 LeaseStatus::Expired { escalation } => {
1074 warn!(
1075 bead_id = BEAD_ID,
1076 trace_id = trace.trace_id,
1077 lease_id = self.lease_id,
1078 effect_name = "lease_expiry",
1079 saga_id = format_saga(trace.saga_id),
1080 idempotency_key = format_key(trace.idempotency_key),
1081 attempt = trace.attempt,
1082 ecs_epoch = trace.ecs_epoch,
1083 escalation = ?escalation,
1084 "remote lease expired; escalating"
1085 );
1086 match escalation {
1087 LeaseEscalation::Cancel => Err(FrankenError::Busy),
1088 LeaseEscalation::Retry => Err(FrankenError::BusyRecovery),
1089 LeaseEscalation::Fail => Err(FrankenError::LockFailed {
1090 detail: "remote lease expired".to_owned(),
1091 }),
1092 }
1093 }
1094 }
1095 }
1096}
1097
1098#[derive(Debug, Default)]
1100pub struct InMemoryRemoteStore {
1101 segments: HashMap<ObjectId, Vec<u8>>,
1102 uploads: HashMap<IdempotencyKey, UploadRecord>,
1103 upload_count: HashMap<ObjectId, u64>,
1104}
1105
1106#[derive(Debug, Clone)]
1107struct UploadRecord {
1108 segment_id: ObjectId,
1109 payload_digest: [u8; 32],
1110}
1111
1112impl InMemoryRemoteStore {
1113 #[must_use]
1114 pub fn new() -> Self {
1115 Self::default()
1116 }
1117
1118 pub fn put_segment(
1125 &mut self,
1126 segment_id: ObjectId,
1127 payload: &[u8],
1128 key: IdempotencyKey,
1129 ) -> Result<()> {
1130 let digest = request_digest(payload);
1131 if let Some(existing) = self.uploads.get(&key) {
1132 if existing.segment_id == segment_id && existing.payload_digest == digest {
1133 self.segments
1136 .entry(segment_id)
1137 .or_insert_with(|| payload.to_vec());
1138 return Ok(());
1139 }
1140 return Err(FrankenError::Internal(
1141 "remote put conflict: idempotency key reused with different payload".to_owned(),
1142 ));
1143 }
1144
1145 self.uploads.insert(
1146 key,
1147 UploadRecord {
1148 segment_id,
1149 payload_digest: digest,
1150 },
1151 );
1152 self.segments.insert(segment_id, payload.to_vec());
1153 *self.upload_count.entry(segment_id).or_insert(0) += 1;
1154 Ok(())
1155 }
1156
1157 #[must_use]
1158 pub fn has_segment(&self, segment_id: ObjectId) -> bool {
1159 self.segments.contains_key(&segment_id)
1160 }
1161
1162 #[must_use]
1163 pub fn upload_count(&self, segment_id: ObjectId) -> u64 {
1164 *self.upload_count.get(&segment_id).unwrap_or(&0)
1165 }
1166
1167 pub fn remove_segment(&mut self, segment_id: ObjectId) -> bool {
1168 self.segments.remove(&segment_id).is_some()
1169 }
1170}
1171
1172#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1174pub enum EvictionPhase {
1175 Init,
1176 Uploaded,
1177 Verified,
1178 Retired,
1179 Cancelled,
1180}
1181
1182#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1184pub enum LocalSegmentState {
1185 Present,
1186 Retired,
1187}
1188
1189#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1191pub enum EvictionCompensation {
1192 LocalRetained,
1193 RollbackRequired,
1194}
1195
1196#[derive(Debug)]
1198pub struct EvictionSaga {
1199 saga: Saga,
1200 segment_id: ObjectId,
1201 phase: EvictionPhase,
1202 local_state: LocalSegmentState,
1203 upload_idempotency_key: IdempotencyKey,
1204}
1205
1206impl EvictionSaga {
1207 #[must_use]
1208 pub fn new(saga: Saga, segment_id: ObjectId) -> Self {
1209 Self {
1210 upload_idempotency_key: derive_step_key(saga.key(), segment_id, b"segment_put"),
1211 saga,
1212 segment_id,
1213 phase: EvictionPhase::Init,
1214 local_state: LocalSegmentState::Present,
1215 }
1216 }
1217
1218 #[must_use]
1219 pub const fn phase(&self) -> EvictionPhase {
1220 self.phase
1221 }
1222
1223 #[must_use]
1224 pub const fn local_state(&self) -> LocalSegmentState {
1225 self.local_state
1226 }
1227
1228 #[must_use]
1229 pub const fn upload_idempotency_key(&self) -> IdempotencyKey {
1230 self.upload_idempotency_key
1231 }
1232
1233 pub fn upload(&mut self, remote: &mut InMemoryRemoteStore, bytes: &[u8]) -> Result<()> {
1240 if !matches!(self.phase, EvictionPhase::Init | EvictionPhase::Cancelled) {
1241 return Err(FrankenError::Internal(format!(
1242 "eviction upload invalid in phase {:?}",
1243 self.phase
1244 )));
1245 }
1246 remote.put_segment(self.segment_id, bytes, self.upload_idempotency_key)?;
1247 self.phase = EvictionPhase::Uploaded;
1248 Ok(())
1249 }
1250
1251 pub fn verify_remote(&mut self, remote: &InMemoryRemoteStore) -> Result<()> {
1258 if self.phase != EvictionPhase::Uploaded {
1259 return Err(FrankenError::Internal(format!(
1260 "eviction verify invalid in phase {:?}",
1261 self.phase
1262 )));
1263 }
1264 if !remote.has_segment(self.segment_id) {
1265 return Err(FrankenError::Internal(
1266 "segment verification failed: missing in remote store".to_owned(),
1267 ));
1268 }
1269 self.phase = EvictionPhase::Verified;
1270 Ok(())
1271 }
1272
1273 pub fn retire_local(&mut self) -> Result<()> {
1279 if self.phase != EvictionPhase::Verified {
1280 return Err(FrankenError::Internal(format!(
1281 "eviction retire invalid in phase {:?}",
1282 self.phase
1283 )));
1284 }
1285 self.local_state = LocalSegmentState::Retired;
1286 self.phase = EvictionPhase::Retired;
1287 Ok(())
1288 }
1289
1290 #[must_use]
1292 pub fn cancel(&mut self) -> EvictionCompensation {
1293 if self.phase == EvictionPhase::Retired {
1294 EvictionCompensation::RollbackRequired
1295 } else {
1296 self.phase = EvictionPhase::Cancelled;
1297 self.local_state = LocalSegmentState::Present;
1298 debug!(
1299 bead_id = BEAD_ID,
1300 saga_id = format_key(Some(self.saga.key())),
1301 "eviction saga cancelled; local segment retained"
1302 );
1303 EvictionCompensation::LocalRetained
1304 }
1305 }
1306}
1307
1308#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1310pub enum CompactionPhase {
1311 Init,
1312 SegmentsStaged,
1313 Published,
1314 LocatorUpdated,
1315 Cancelled,
1316}
1317
1318#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1320pub enum CompactionCompensation {
1321 RemoteCleaned,
1322 RollbackRequired,
1323}
1324
1325#[derive(Debug)]
1327pub struct CompactionPublishSaga {
1328 saga: Saga,
1329 manifest_id: ObjectId,
1330 staged_segments: Vec<ObjectId>,
1331 phase: CompactionPhase,
1332 locator_updated: bool,
1333}
1334
1335impl CompactionPublishSaga {
1336 #[must_use]
1337 pub fn new(saga: Saga, manifest_id: ObjectId) -> Self {
1338 Self {
1339 saga,
1340 manifest_id,
1341 staged_segments: Vec::new(),
1342 phase: CompactionPhase::Init,
1343 locator_updated: false,
1344 }
1345 }
1346
1347 #[must_use]
1348 pub const fn phase(&self) -> CompactionPhase {
1349 self.phase
1350 }
1351
1352 #[must_use]
1353 pub const fn locator_updated(&self) -> bool {
1354 self.locator_updated
1355 }
1356
1357 pub fn stage_segments(
1364 &mut self,
1365 remote: &mut InMemoryRemoteStore,
1366 segments: &[(ObjectId, Vec<u8>)],
1367 ) -> Result<()> {
1368 if !matches!(
1369 self.phase,
1370 CompactionPhase::Init | CompactionPhase::Cancelled
1371 ) {
1372 return Err(FrankenError::Internal(format!(
1373 "compaction stage invalid in phase {:?}",
1374 self.phase
1375 )));
1376 }
1377 if segments.is_empty() {
1378 return Err(FrankenError::OutOfRange {
1379 what: "compaction_segments".to_owned(),
1380 value: "0".to_owned(),
1381 });
1382 }
1383
1384 self.staged_segments.clear();
1385 for (segment_id, payload) in segments {
1386 let key = derive_step_key(self.saga.key(), *segment_id, b"compaction_segment_put");
1387 remote.put_segment(*segment_id, payload, key)?;
1388 self.staged_segments.push(*segment_id);
1389 }
1390 self.phase = CompactionPhase::SegmentsStaged;
1391 self.locator_updated = false;
1392 Ok(())
1393 }
1394
1395 pub fn publish_manifest(
1401 &mut self,
1402 remote: &mut InMemoryRemoteStore,
1403 manifest: &[u8],
1404 ) -> Result<()> {
1405 if self.phase != CompactionPhase::SegmentsStaged {
1406 return Err(FrankenError::Internal(format!(
1407 "compaction publish invalid in phase {:?}",
1408 self.phase
1409 )));
1410 }
1411 let key = derive_step_key(
1412 self.saga.key(),
1413 self.manifest_id,
1414 b"compaction_manifest_publish",
1415 );
1416 remote.put_segment(self.manifest_id, manifest, key)?;
1417 self.phase = CompactionPhase::Published;
1418 Ok(())
1419 }
1420
1421 pub fn update_locator(&mut self) -> Result<()> {
1427 if self.phase != CompactionPhase::Published {
1428 return Err(FrankenError::Internal(format!(
1429 "compaction locator update invalid in phase {:?}",
1430 self.phase
1431 )));
1432 }
1433 self.locator_updated = true;
1434 self.phase = CompactionPhase::LocatorUpdated;
1435 Ok(())
1436 }
1437
1438 #[must_use]
1443 pub fn cancel(&mut self, remote: &mut InMemoryRemoteStore) -> CompactionCompensation {
1444 match self.phase {
1445 CompactionPhase::LocatorUpdated => CompactionCompensation::RollbackRequired,
1446 CompactionPhase::Init | CompactionPhase::Cancelled => {
1447 self.phase = CompactionPhase::Cancelled;
1448 self.locator_updated = false;
1449 CompactionCompensation::RemoteCleaned
1450 }
1451 CompactionPhase::SegmentsStaged | CompactionPhase::Published => {
1452 for segment in &self.staged_segments {
1453 let _ = remote.remove_segment(*segment);
1454 }
1455 let _ = remote.remove_segment(self.manifest_id);
1456 self.phase = CompactionPhase::Cancelled;
1457 self.locator_updated = false;
1458 debug!(
1459 bead_id = BEAD_ID,
1460 saga_id = format_key(Some(self.saga.key())),
1461 "compaction saga cancelled; remote staged objects cleaned"
1462 );
1463 CompactionCompensation::RemoteCleaned
1464 }
1465 }
1466 }
1467}
1468
1469#[must_use]
1470fn derive_step_key(
1471 base_key: IdempotencyKey,
1472 object_id: ObjectId,
1473 step_tag: &[u8],
1474) -> IdempotencyKey {
1475 let mut bytes = Vec::with_capacity(16 + 16 + step_tag.len());
1476 bytes.extend_from_slice(base_key.as_bytes());
1477 bytes.extend_from_slice(object_id.as_bytes());
1478 bytes.extend_from_slice(step_tag);
1479 derive_idempotency_key(&bytes)
1480}
1481
1482#[must_use]
1483fn format_key(key: Option<IdempotencyKey>) -> String {
1484 key.map_or_else(|| "-".to_owned(), |k| hex16(k.as_bytes()))
1485}
1486
1487#[must_use]
1488fn format_saga(saga: Option<Saga>) -> String {
1489 saga.map_or_else(|| "-".to_owned(), |s| hex16(s.key().as_bytes()))
1490}
1491
1492#[must_use]
1493fn hex16(bytes: &[u8; 16]) -> String {
1494 use std::fmt::Write;
1495 let mut out = String::with_capacity(32);
1496 for byte in bytes {
1497 let _ = write!(out, "{byte:02x}");
1498 }
1499 out
1500}
1501
1502#[cfg(not(feature = "native"))]
1503#[derive(Debug)]
1504struct FallbackQueueReservation<'a> {
1505 queued_waiters: &'a AtomicUsize,
1506 released: bool,
1507}
1508
1509#[cfg(not(feature = "native"))]
1510impl FallbackQueueReservation<'_> {
1511 fn new(queued_waiters: &AtomicUsize) -> FallbackQueueReservation<'_> {
1512 FallbackQueueReservation {
1513 queued_waiters,
1514 released: false,
1515 }
1516 }
1517
1518 fn release(mut self) {
1519 if !self.released {
1520 self.queued_waiters.fetch_sub(1, Ordering::AcqRel);
1521 self.released = true;
1522 }
1523 }
1524}
1525
1526#[cfg(not(feature = "native"))]
1527impl Drop for FallbackQueueReservation<'_> {
1528 fn drop(&mut self) {
1529 if !self.released {
1530 self.queued_waiters.fetch_sub(1, Ordering::AcqRel);
1531 self.released = true;
1532 }
1533 }
1534}
1535
1536#[cfg(test)]
1537mod tests {
1538 use std::sync::Arc;
1539 use std::sync::atomic::{AtomicUsize, Ordering};
1540 use std::thread;
1541 use std::time::Duration;
1542 #[cfg(not(feature = "native"))]
1543 use std::time::Instant;
1544
1545 use super::*;
1546
1547 fn cap_token(seed: u8) -> RemoteCap {
1548 RemoteCap::from_bytes([seed; 16])
1549 }
1550
1551 fn segment_id(seed: u8) -> ObjectId {
1552 ObjectId::from_bytes([seed; 16])
1553 }
1554
1555 #[cfg(not(feature = "native"))]
1556 fn wait_for(condition: impl Fn() -> bool) {
1557 let deadline = Instant::now() + Duration::from_millis(250);
1558 while Instant::now() < deadline {
1559 if condition() {
1560 return;
1561 }
1562 thread::sleep(Duration::from_millis(1));
1563 }
1564 assert!(condition(), "timed out waiting for condition");
1565 }
1566
1567 #[test]
1568 fn test_remote_cap_required_for_network_io() {
1569 let cx = Cx::<cap::All>::new();
1570 let registry = ComputationRegistry::default();
1571 let executor = Executor::with_max_in_flight(1).unwrap();
1572 let computation = NamedComputation::new(ComputationName::SegmentStat, vec![1, 2, 3]);
1573 let trace = TraceContext::default();
1574
1575 let err = executor
1576 .execute(&cx, None, ®istry, &computation, &trace, || {
1577 Ok(vec![0xAA])
1578 })
1579 .unwrap_err();
1580
1581 assert!(matches!(err, FrankenError::Internal(_)));
1582 }
1583
1584 #[test]
1585 fn test_remote_cap_omitted_in_lab_fails_gracefully() {
1586 let cx = Cx::<cap::All>::new();
1587 let registry = ComputationRegistry::default();
1588 let executor = Executor::with_max_in_flight(1).unwrap();
1589 let computation = NamedComputation::new(ComputationName::SegmentStat, vec![0xAA]);
1590 let trace = TraceContext {
1591 trace_id: "lab-no-remote".to_owned(),
1592 lab_seed: Some(17),
1593 schedule_fingerprint: Some("sched-A".to_owned()),
1594 ..TraceContext::default()
1595 };
1596
1597 let err = executor
1598 .execute(&cx, None, ®istry, &computation, &trace, || {
1599 Ok(vec![0xBB])
1600 })
1601 .unwrap_err();
1602 assert!(matches!(err, FrankenError::Internal(_)));
1603 }
1604
1605 #[test]
1606 fn test_named_computation_registry_and_unregistered_rejection() {
1607 let mut registry = ComputationRegistry::new_empty();
1608 registry.register(ComputationName::SymbolGetRange);
1609 registry.register(ComputationName::SymbolPutBatch);
1610 registry.register(ComputationName::SegmentPut);
1611 registry.register(ComputationName::SegmentStat);
1612
1613 assert!(registry.validate(&ComputationName::SegmentPut).is_ok());
1614 assert!(
1615 registry
1616 .validate(&ComputationName::Custom("unregistered".to_owned()))
1617 .is_err()
1618 );
1619 }
1620
1621 #[test]
1622 fn test_named_computation_no_closure_shipping_canonical_bytes_deterministic() {
1623 let computation = NamedComputation::new(
1624 ComputationName::SymbolGetRange,
1625 b"obj=01;esi=0..7;epoch=2".to_vec(),
1626 );
1627 let bytes_a = computation.canonical_request_bytes().unwrap();
1628 let bytes_b = computation.canonical_request_bytes().unwrap();
1629 assert_eq!(bytes_a, bytes_b);
1630 let domain = REMOTE_IDEMPOTENCY_DOMAIN.as_bytes();
1631 assert!(bytes_a.windows(domain.len()).any(|window| window == domain));
1632 }
1633
1634 #[test]
1635 fn test_idempotency_key_deterministic() {
1636 let request = b"segment_put:abc";
1637 let key_a = derive_idempotency_key(request);
1638 let key_b = derive_idempotency_key(request);
1639 assert_eq!(key_a, key_b);
1640 }
1641
1642 #[test]
1643 fn test_idempotency_dedup_same_key_same_input() {
1644 let store = IdempotencyStore::new();
1645 let computation = ComputationName::SegmentPut;
1646 let request = b"segment_put:id=1";
1647 let key = derive_idempotency_key(request);
1648
1649 let first = store
1650 .register_or_replay(key, &computation, request, b"ok:first")
1651 .unwrap();
1652 let second = store
1653 .register_or_replay(key, &computation, request, b"ok:second")
1654 .unwrap();
1655
1656 assert!(matches!(first, IdempotencyDecision::StoredNew(_)));
1657 assert_eq!(second, IdempotencyDecision::Replayed(b"ok:first".to_vec()));
1658 }
1659
1660 #[test]
1661 fn test_idempotency_conflict_same_key_different_input() {
1662 let store = IdempotencyStore::new();
1663 let computation = ComputationName::SegmentPut;
1664 let first_request = b"segment_put:id=1";
1665 let second_request = b"segment_put:id=2";
1666 let key = derive_idempotency_key(first_request);
1667
1668 let _ = store
1669 .register_or_replay(key, &computation, first_request, b"ok:first")
1670 .unwrap();
1671
1672 let err = store
1673 .register_or_replay(key, &computation, second_request, b"ok:second")
1674 .unwrap_err();
1675 assert!(matches!(err, FrankenError::Internal(_)));
1676 }
1677
1678 #[test]
1679 fn test_lease_backed_liveness_expiry() {
1680 let trace = TraceContext {
1681 trace_id: "trace-lease".to_owned(),
1682 attempt: 1,
1683 ecs_epoch: 7,
1684 ..TraceContext::default()
1685 };
1686 let handle = LeaseBackedHandle::new(42, 1_000, 100, LeaseEscalation::Retry).unwrap();
1687 let status = handle.evaluate(1_200);
1688 assert_eq!(
1689 status,
1690 LeaseStatus::Expired {
1691 escalation: LeaseEscalation::Retry
1692 }
1693 );
1694 let err = handle.enforce(1_200, &trace).unwrap_err();
1695 assert!(matches!(err, FrankenError::BusyRecovery));
1696 }
1697
1698 #[test]
1699 fn test_e2e_remote_effects_saga_eviction_idempotent_restart() {
1700 let saga_key = derive_idempotency_key(b"saga:evict:segment-9");
1701 let saga_id = Saga::new(saga_key);
1702 let target_segment = segment_id(9);
1703 let payload = b"segment payload".to_vec();
1704
1705 let mut remote = InMemoryRemoteStore::new();
1706
1707 let mut first = EvictionSaga::new(saga_id, target_segment);
1708 first.upload(&mut remote, &payload).unwrap();
1709 let compensation = first.cancel();
1710 assert_eq!(compensation, EvictionCompensation::LocalRetained);
1711 assert_eq!(first.local_state(), LocalSegmentState::Present);
1712
1713 let mut restart = EvictionSaga::new(saga_id, target_segment);
1714 restart.upload(&mut remote, &payload).unwrap();
1715 restart.verify_remote(&remote).unwrap();
1716 restart.retire_local().unwrap();
1717
1718 assert_eq!(restart.local_state(), LocalSegmentState::Retired);
1719 assert_eq!(remote.upload_count(target_segment), 1);
1720 }
1721
1722 #[test]
1723 fn test_remote_bulkhead_concurrency_cap() {
1724 let executor = Arc::new(Executor::with_max_in_flight(2).unwrap());
1725 let registry = Arc::new(ComputationRegistry::default());
1726 let computation = Arc::new(NamedComputation::new(ComputationName::SegmentStat, vec![1]));
1727
1728 let active = Arc::new(AtomicUsize::new(0));
1729 let peak = Arc::new(AtomicUsize::new(0));
1730 let busy = Arc::new(AtomicUsize::new(0));
1731
1732 let start = Arc::new(std::sync::Barrier::new(5));
1733 let mut workers = Vec::new();
1734 for _ in 0..5 {
1735 let exec = Arc::clone(&executor);
1736 let reg = Arc::clone(®istry);
1737 let comp = Arc::clone(&computation);
1738 let active_ctr = Arc::clone(&active);
1739 let peak_ctr = Arc::clone(&peak);
1740 let busy_ctr = Arc::clone(&busy);
1741 let barrier = Arc::clone(&start);
1742 workers.push(thread::spawn(move || {
1743 let cx = Cx::<cap::All>::new();
1744 let trace = TraceContext::default();
1745 barrier.wait();
1746 let result = exec.execute(&cx, Some(cap_token(7)), ®, &comp, &trace, || {
1747 let now = active_ctr.fetch_add(1, Ordering::AcqRel) + 1;
1748 peak_ctr.fetch_max(now, Ordering::AcqRel);
1749 thread::sleep(Duration::from_millis(40));
1750 active_ctr.fetch_sub(1, Ordering::AcqRel);
1751 Ok(vec![1, 2, 3])
1752 });
1753 if matches!(result, Err(FrankenError::Busy)) {
1754 busy_ctr.fetch_add(1, Ordering::AcqRel);
1755 }
1756 }));
1757 }
1758 for worker in workers {
1759 worker.join().unwrap();
1760 }
1761
1762 assert!(busy.load(Ordering::Acquire) >= 3);
1763 assert!(peak.load(Ordering::Acquire) <= 2);
1764 }
1765
1766 #[test]
1767 fn test_remote_bulkhead_zero_means_auto() {
1768 let expected = conservative_remote_max_in_flight(available_parallelism_or_one());
1769 let executor = Executor::from_pragma_remote_max_in_flight(0).unwrap();
1770 assert_eq!(executor.max_in_flight(), expected);
1771 }
1772
1773 #[cfg(not(feature = "native"))]
1774 #[test]
1775 fn test_remote_bulkhead_fallback_queue_waits_for_capacity() {
1776 let executor = Arc::new(
1777 Executor::with_limits("fallback.test", 1, 1, Duration::from_millis(100)).unwrap(),
1778 );
1779 let held = executor.try_acquire_for_testing().unwrap();
1780
1781 let exec = Arc::clone(&executor);
1782 let waiter = thread::spawn(move || {
1783 let cx = Cx::<cap::All>::new();
1784 let permit = exec.acquire(&cx, "segment_stat", None, None, 0).unwrap();
1785 drop(permit);
1786 });
1787
1788 wait_for(|| executor.snapshot().queue_depth == 1);
1789 assert_eq!(executor.snapshot().active_permits, 1);
1790
1791 drop(held);
1792 waiter.join().unwrap();
1793
1794 let snapshot = executor.snapshot();
1795 assert_eq!(snapshot.queue_depth, 0);
1796 assert_eq!(snapshot.total_rejected, 0);
1797 assert_eq!(snapshot.total_cancelled, 0);
1798 }
1799
1800 #[cfg(not(feature = "native"))]
1801 #[test]
1802 fn test_remote_bulkhead_fallback_cancellation_releases_queue_slot() {
1803 let executor = Arc::new(
1804 Executor::with_limits("fallback.test", 1, 1, Duration::from_millis(100)).unwrap(),
1805 );
1806 let held = executor.try_acquire_for_testing().unwrap();
1807 let cx = Cx::<cap::All>::new();
1808 let waiter_cx = cx.clone();
1809
1810 let exec = Arc::clone(&executor);
1811 let waiter = thread::spawn(move || {
1812 let err = exec
1813 .acquire(&waiter_cx, "segment_stat", None, None, 0)
1814 .unwrap_err();
1815 assert!(matches!(err, FrankenError::Busy));
1816 });
1817
1818 wait_for(|| executor.snapshot().queue_depth == 1);
1819 cx.cancel();
1820 waiter.join().unwrap();
1821 drop(held);
1822
1823 let snapshot = executor.snapshot();
1824 assert_eq!(snapshot.queue_depth, 0);
1825 assert_eq!(snapshot.total_cancelled, 1);
1826 }
1827
1828 #[test]
1829 fn test_compaction_publish_saga_forward() {
1830 let saga_key = derive_idempotency_key(b"saga:compaction:forward");
1831 let saga_id = Saga::new(saga_key);
1832 let manifest_id = segment_id(99);
1833 let mut remote = InMemoryRemoteStore::new();
1834
1835 let mut saga = CompactionPublishSaga::new(saga_id, manifest_id);
1836 let segments = vec![
1837 (segment_id(11), b"seg-11".to_vec()),
1838 (segment_id(12), b"seg-12".to_vec()),
1839 ];
1840
1841 saga.stage_segments(&mut remote, &segments).unwrap();
1842 saga.publish_manifest(&mut remote, b"manifest-v2").unwrap();
1843 saga.update_locator().unwrap();
1844
1845 assert_eq!(saga.phase(), CompactionPhase::LocatorUpdated);
1846 assert!(saga.locator_updated());
1847 assert!(remote.has_segment(segment_id(11)));
1848 assert!(remote.has_segment(segment_id(12)));
1849 assert!(remote.has_segment(manifest_id));
1850 }
1851
1852 #[test]
1853 fn test_compaction_publish_saga_compensation_then_restart_idempotent() {
1854 let saga_key = derive_idempotency_key(b"saga:compaction:restart");
1855 let saga_id = Saga::new(saga_key);
1856 let manifest_id = segment_id(101);
1857 let seg_a = segment_id(21);
1858 let seg_b = segment_id(22);
1859 let mut remote = InMemoryRemoteStore::new();
1860 let segments = vec![(seg_a, b"seg-21".to_vec()), (seg_b, b"seg-22".to_vec())];
1861
1862 let mut first = CompactionPublishSaga::new(saga_id, manifest_id);
1863 first.stage_segments(&mut remote, &segments).unwrap();
1864 first.publish_manifest(&mut remote, b"manifest-v3").unwrap();
1865 let compensation = first.cancel(&mut remote);
1866 assert_eq!(compensation, CompactionCompensation::RemoteCleaned);
1867 assert!(!remote.has_segment(seg_a));
1868 assert!(!remote.has_segment(seg_b));
1869 assert!(!remote.has_segment(manifest_id));
1870
1871 let mut restart = CompactionPublishSaga::new(saga_id, manifest_id);
1872 restart.stage_segments(&mut remote, &segments).unwrap();
1873 restart
1874 .publish_manifest(&mut remote, b"manifest-v3")
1875 .unwrap();
1876 restart.update_locator().unwrap();
1877
1878 assert_eq!(restart.phase(), CompactionPhase::LocatorUpdated);
1879 assert!(restart.locator_updated());
1880 assert!(remote.has_segment(seg_a));
1881 assert!(remote.has_segment(seg_b));
1882 assert!(remote.has_segment(manifest_id));
1883 assert_eq!(remote.upload_count(seg_a), 1);
1884 assert_eq!(remote.upload_count(seg_b), 1);
1885 }
1886}