Skip to main content

vox_core/
driver.rs

1use std::{
2    collections::{BTreeMap, HashMap, HashSet},
3    panic::AssertUnwindSafe,
4    pin::Pin,
5    sync::{
6        Arc, Weak,
7        atomic::{AtomicU64, Ordering},
8    },
9};
10
11use vox_types::time::Instant;
12
13use futures_util::future::{AbortHandle, Abortable, FutureExt as FuturesFutureExt};
14use futures_util::stream::{FuturesUnordered, StreamExt as _};
15use moire::sync::{Semaphore, SyncMutex};
16use tokio::sync::watch;
17
18use moire::task::FutureExt as _;
19use vox_types::{
20    BoxFut, CallResult, ChannelBinder, ChannelBody, ChannelClose, ChannelCreditReplenisher,
21    ChannelCreditReplenisherHandle, ChannelEventContext, ChannelId, ChannelItem,
22    ChannelLivenessHandle, ChannelMailboxReceiver, ChannelMailboxSender, ChannelMessage,
23    ChannelRetryMode, ChannelSink, ConnectionId, CreditSink, Handler, IdAllocator,
24    IncomingChannelMessage, MaybeSend, MaybeSendFuture, MaybeSync, Payload, ReplySink, RequestBody,
25    RequestCall, RequestId, RequestMessage, RequestResponse, SelfRef, TrySendError, TxError,
26    VoxError, channel_mailbox, ensure_operation_id, metadata_channel_retry_mode,
27    metadata_operation_id,
28};
29use vox_types::{
30    ChannelCloseReason, ChannelDebugContext, ChannelDirection, ChannelEvent, ChannelResetReason,
31    ChannelSendOutcome, ChannelTrySendOutcome, DriverEvent, RpcOutcome, VoxObserverHandle,
32};
33use vox_types::{
34    ChannelDebugSnapshot, ChannelReceiverState, ConnectionCloseReason, ConnectionDebugSnapshot,
35    ConnectionDebugState, DriverTaskStatus, RequestDebugSnapshot, RequestDebugState,
36    VoxDebugSnapshot,
37};
38
39use crate::session::{
40    ConnectionHandle, ConnectionMessage, ConnectionSender, DropControlRequest, FailureDisposition,
41};
42use crate::{InMemoryOperationStore, OperationStore};
43use moire::sync::mpsc;
44use vox_types::{OperationId, PostcardPayload};
45
46/// A pending response for one outbound request attempt.
47///
48/// Carries both the wire response message and the recv tracker that was
49/// current when the response was received, so the caller can deserialize
50/// the response with the correct schemas.
51struct PendingResponse {
52    msg: SelfRef<RequestMessage<'static>>,
53    schemas: Arc<vox_types::SchemaRecvTracker>,
54    /// Descriptors that arrived with the response frame, surfaced to the
55    /// caller via [`WithTracker::fds`](vox_types::WithTracker) and installed
56    /// at the typed-return decode site. `()` off-Unix.
57    fds: vox_types::FrameFds,
58}
59
60type ResponseSlot = moire::sync::oneshot::Sender<PendingResponse>;
61
62struct InFlightHandler {
63    /// Aborts the handler future hosted on `Driver::handler_futs`. Triggered
64    /// by `Cancel`-style flows; the FuturesUnordered will yield an `Aborted`
65    /// item on its next poll, and the request will be removed from
66    /// `in_flight_handlers` (if not already gone).
67    abort: AbortHandle,
68    method_id: vox_types::MethodId,
69    retry: vox_types::RetryPolicy,
70    operation_id: Option<OperationId>,
71}
72
73/// Boxed handler future hosted on `Driver::handler_futs`. The future yields
74/// the `RequestId` it was attached to so the driver can clean up the
75/// `in_flight_handlers` entry when it completes.
76///
77/// We `Box::pin` because the handler returns an unnameable `async move {}`
78/// and we want `FuturesUnordered` to hold a single concrete element type.
79/// Total alloc footprint per request is one `Box<dyn Future>` plus one
80/// `Arc<Task>` (allocated by `FuturesUnordered::push`). Compared to
81/// `tokio::spawn` (which allocates a `Cell<T, S>` containing
82/// `Stage<Future, Output>` plus does scheduler registration), this drops
83/// the `Stage` overhead and the `set_stage` memcpy that fires on
84/// `Running → Finished` transitions.
85enum HandlerCompletion {
86    Finished(RequestId),
87    Panicked {
88        request_id: RequestId,
89        method_id: vox_types::MethodId,
90    },
91}
92
93type HandlerFut = Abortable<Pin<Box<dyn MaybeSendFuture<Output = HandlerCompletion> + 'static>>>;
94
95#[derive(Clone, Copy, Debug)]
96enum ChannelRuntimeTeardown {
97    DropOnly,
98    ConnectionClosed(ConnectionCloseReason),
99}
100
101// ============================================================================
102// Live operation tracking (driver-local, not persisted)
103// ============================================================================
104
105/// Tracks in-flight operations within the current session.
106///
107/// This is session-scoped state that does NOT survive crashes. The
108/// `OperationStore` handles persistence; this handles the live
109/// attach/waiter/conflict logic.
110struct LiveOperationTracker {
111    /// Maps operation_id → live state. Removed when sealed or released.
112    live: HashMap<OperationId, LiveOperation>,
113    /// Maps request_id → operation_id for cancel routing.
114    request_to_operation: HashMap<RequestId, OperationId>,
115}
116
117struct LiveOperation {
118    method_id: vox_types::MethodId,
119    args_hash: u64,
120    owner_request_id: RequestId,
121    waiters: Vec<RequestId>,
122    retry: vox_types::RetryPolicy,
123}
124
125enum AdmitResult {
126    /// New operation — run the handler.
127    Start,
128    /// Same operation already in flight — wait for its result.
129    Attached,
130    /// Same operation ID but different method/args — protocol error.
131    Conflict,
132}
133
134impl LiveOperationTracker {
135    fn new() -> Self {
136        Self {
137            live: HashMap::new(),
138            request_to_operation: HashMap::new(),
139        }
140    }
141
142    fn admit(
143        &mut self,
144        operation_id: OperationId,
145        method_id: vox_types::MethodId,
146        args: &[u8],
147        retry: vox_types::RetryPolicy,
148        request_id: RequestId,
149    ) -> AdmitResult {
150        use std::hash::{Hash, Hasher};
151        let args_hash = {
152            let mut h = std::collections::hash_map::DefaultHasher::new();
153            method_id.hash(&mut h);
154            args.hash(&mut h);
155            h.finish()
156        };
157        let live_operations = self.live.len();
158
159        if let Some(live) = self.live.get_mut(&operation_id) {
160            if live.method_id != method_id || live.args_hash != args_hash {
161                let request_bindings = self.request_to_operation.len();
162                tracing::trace!(
163                    %operation_id,
164                    %request_id,
165                    ?method_id,
166                    live_operations,
167                    request_bindings,
168                    "live operation conflict"
169                );
170                return AdmitResult::Conflict;
171            }
172            live.waiters.push(request_id);
173            self.request_to_operation.insert(request_id, operation_id);
174            let waiters = live.waiters.len();
175            let request_bindings = self.request_to_operation.len();
176            tracing::trace!(
177                %operation_id,
178                %request_id,
179                ?method_id,
180                waiters,
181                live_operations,
182                request_bindings,
183                "live operation attached"
184            );
185            return AdmitResult::Attached;
186        }
187
188        self.live.insert(
189            operation_id,
190            LiveOperation {
191                method_id,
192                args_hash,
193                owner_request_id: request_id,
194                waiters: vec![request_id],
195                retry,
196            },
197        );
198        self.request_to_operation.insert(request_id, operation_id);
199        let live_operations = self.live.len();
200        let request_bindings = self.request_to_operation.len();
201        tracing::trace!(
202            %operation_id,
203            %request_id,
204            ?method_id,
205            live_operations,
206            request_bindings,
207            "live operation admitted"
208        );
209        AdmitResult::Start
210    }
211
212    /// Seal a live operation, returning all waiter request IDs (including the owner).
213    fn seal(&mut self, operation_id: OperationId) -> Vec<RequestId> {
214        if let Some(live) = self.live.remove(&operation_id) {
215            for waiter in &live.waiters {
216                self.request_to_operation.remove(waiter);
217            }
218            let waiters = live.waiters.len();
219            let live_operations = self.live.len();
220            let request_bindings = self.request_to_operation.len();
221            tracing::trace!(
222                %operation_id,
223                waiters,
224                live_operations,
225                request_bindings,
226                "live operation sealed"
227            );
228            live.waiters
229        } else {
230            vec![]
231        }
232    }
233
234    /// Release a live operation without sealing (handler failed).
235    fn release(&mut self, operation_id: OperationId) -> Option<LiveOperation> {
236        if let Some(live) = self.live.remove(&operation_id) {
237            for waiter in &live.waiters {
238                self.request_to_operation.remove(waiter);
239            }
240            let waiters = live.waiters.len();
241            let live_operations = self.live.len();
242            let request_bindings = self.request_to_operation.len();
243            tracing::trace!(
244                %operation_id,
245                waiters,
246                live_operations,
247                request_bindings,
248                "live operation released"
249            );
250            Some(live)
251        } else {
252            None
253        }
254    }
255
256    /// Cancel a request. Returns what to do.
257    fn cancel(&mut self, request_id: RequestId) -> CancelResult {
258        let Some(&operation_id) = self.request_to_operation.get(&request_id) else {
259            return CancelResult::NotFound;
260        };
261        let live_operations = self.live.len();
262        let Some(live) = self.live.get_mut(&operation_id) else {
263            self.request_to_operation.remove(&request_id);
264            return CancelResult::NotFound;
265        };
266
267        if live.retry.persist {
268            // Persistent operations: only detach non-owner waiters.
269            if live.owner_request_id == request_id {
270                return CancelResult::NotFound; // Can't cancel the owner of a persistent op
271            }
272            live.waiters.retain(|w| *w != request_id);
273            self.request_to_operation.remove(&request_id);
274            let waiters = live.waiters.len();
275            let request_bindings = self.request_to_operation.len();
276            tracing::trace!(
277                %operation_id,
278                %request_id,
279                waiters,
280                live_operations,
281                request_bindings,
282                "live operation detached waiter"
283            );
284            CancelResult::Detached
285        } else {
286            // Non-persistent: abort the whole operation.
287            let live = self.live.remove(&operation_id).unwrap();
288            for waiter in &live.waiters {
289                self.request_to_operation.remove(waiter);
290            }
291            let waiters = live.waiters.len();
292            let live_operations = self.live.len();
293            let request_bindings = self.request_to_operation.len();
294            tracing::trace!(
295                %operation_id,
296                %request_id,
297                waiters,
298                live_operations,
299                request_bindings,
300                "live operation aborted"
301            );
302            CancelResult::Abort {
303                owner_request_id: live.owner_request_id,
304                waiters: live.waiters,
305            }
306        }
307    }
308}
309
310enum CancelResult {
311    NotFound,
312    Detached,
313    Abort {
314        owner_request_id: RequestId,
315        waiters: Vec<RequestId>,
316    },
317}
318
319#[derive(Clone)]
320struct RequestRuntimeDebug {
321    method_id: vox_types::MethodId,
322    service: Option<&'static str>,
323    method: Option<&'static str>,
324    started_at: Instant,
325    state: RequestDebugState,
326    response_sender_blocked: Option<bool>,
327    associated_channels: Vec<ChannelId>,
328}
329
330impl RequestRuntimeDebug {
331    fn snapshot(&self, request_id: RequestId, now: Instant) -> RequestDebugSnapshot {
332        RequestDebugSnapshot {
333            request_id,
334            service: self.service,
335            method: self.method,
336            method_id: self.method_id,
337            age: now.saturating_duration_since(self.started_at),
338            state: self.state,
339            response_sender_blocked: self.response_sender_blocked,
340            associated_channels: self.associated_channels.clone(),
341        }
342    }
343}
344
345#[derive(Clone)]
346struct ChannelRuntimeDebug {
347    direction: ChannelDirection,
348    debug: Option<ChannelDebugContext>,
349    initial_credit: u32,
350    inbound_queue_len: usize,
351    inbound_queue_capacity: Option<usize>,
352    receiver_state: ChannelReceiverState,
353    last_item_sent_at: Option<Instant>,
354    last_item_received_at: Option<Instant>,
355    last_item_consumed_at: Option<Instant>,
356    last_credit_granted_at: Option<Instant>,
357    last_credit_received_at: Option<Instant>,
358    last_credit_granted_amount: Option<u32>,
359    last_credit_received_amount: Option<u32>,
360    pending_local_grant_credit: u32,
361    total_credit_granted: u64,
362    total_credit_received: u64,
363    sent: u64,
364    sends_started: u64,
365    sends_completed: u64,
366    sends_waited_for_credit: u64,
367    try_send_full_credit: u64,
368    try_send_full_runtime_queue: u64,
369    closed: u64,
370    reset: u64,
371    dropped: u64,
372    items_received: u64,
373    items_consumed: u64,
374    credit_granted: u64,
375    credit_received: u64,
376    close_reason: Option<ChannelCloseReason>,
377    reset_reason: Option<ChannelResetReason>,
378}
379
380impl ChannelRuntimeDebug {
381    fn new(
382        direction: ChannelDirection,
383        initial_credit: u32,
384        debug: Option<ChannelDebugContext>,
385    ) -> Self {
386        Self {
387            direction,
388            debug,
389            initial_credit,
390            inbound_queue_len: 0,
391            inbound_queue_capacity: match direction {
392                ChannelDirection::Rx => Some(initial_credit as usize),
393                ChannelDirection::Tx => None,
394            },
395            receiver_state: ChannelReceiverState::Present,
396            last_item_sent_at: None,
397            last_item_received_at: None,
398            last_item_consumed_at: None,
399            last_credit_granted_at: None,
400            last_credit_received_at: None,
401            last_credit_granted_amount: None,
402            last_credit_received_amount: None,
403            pending_local_grant_credit: 0,
404            total_credit_granted: 0,
405            total_credit_received: 0,
406            sent: 0,
407            sends_started: 0,
408            sends_completed: 0,
409            sends_waited_for_credit: 0,
410            try_send_full_credit: 0,
411            try_send_full_runtime_queue: 0,
412            closed: 0,
413            reset: 0,
414            dropped: 0,
415            items_received: 0,
416            items_consumed: 0,
417            credit_granted: 0,
418            credit_received: 0,
419            close_reason: None,
420            reset_reason: None,
421        }
422    }
423
424    fn merge_debug(&mut self, debug: Option<ChannelDebugContext>) {
425        if self.debug.is_none() {
426            self.debug = debug;
427        }
428    }
429
430    fn mark_item_received(&mut self, now: Instant) {
431        self.items_received = self.items_received.saturating_add(1);
432        self.inbound_queue_len = self.inbound_queue_len.saturating_add(1);
433        self.last_item_received_at = Some(now);
434    }
435
436    fn mark_closed(&mut self, reason: ChannelCloseReason) {
437        self.closed = self.closed.saturating_add(1);
438        self.close_reason = Some(reason);
439        self.receiver_state = ChannelReceiverState::Closed;
440        if reason == ChannelCloseReason::Dropped {
441            self.dropped = self.dropped.saturating_add(1);
442            self.receiver_state = ChannelReceiverState::Dropped;
443        }
444    }
445
446    fn mark_reset(&mut self, reason: ChannelResetReason) {
447        self.reset = self.reset.saturating_add(1);
448        self.reset_reason = Some(reason);
449        self.receiver_state = ChannelReceiverState::Reset;
450    }
451
452    fn mark_send_started(&mut self) {
453        self.sends_started = self.sends_started.saturating_add(1);
454    }
455
456    fn mark_send_waiting_for_credit(&mut self) {
457        self.sends_waited_for_credit = self.sends_waited_for_credit.saturating_add(1);
458    }
459
460    fn mark_send_finished(&mut self, outcome: ChannelSendOutcome, now: Instant) {
461        self.sends_completed = self.sends_completed.saturating_add(1);
462        if outcome == ChannelSendOutcome::Sent {
463            self.sent = self.sent.saturating_add(1);
464            self.last_item_sent_at = Some(now);
465        }
466    }
467
468    fn mark_try_send_outcome(&mut self, outcome: ChannelTrySendOutcome, now: Instant) {
469        match outcome {
470            ChannelTrySendOutcome::Sent => {
471                self.sent = self.sent.saturating_add(1);
472                self.last_item_sent_at = Some(now);
473            }
474            ChannelTrySendOutcome::FullCredit => {
475                self.try_send_full_credit = self.try_send_full_credit.saturating_add(1);
476            }
477            ChannelTrySendOutcome::FullRuntimeQueue => {
478                self.try_send_full_runtime_queue =
479                    self.try_send_full_runtime_queue.saturating_add(1);
480            }
481            ChannelTrySendOutcome::Unbound | ChannelTrySendOutcome::Closed => {}
482        }
483    }
484
485    fn mark_item_consumed(&mut self, now: Instant) {
486        self.items_consumed = self.items_consumed.saturating_add(1);
487        self.inbound_queue_len = self.inbound_queue_len.saturating_sub(1);
488        self.last_item_consumed_at = Some(now);
489    }
490
491    fn mark_inbound_item_not_enqueued(&mut self) {
492        self.inbound_queue_len = self.inbound_queue_len.saturating_sub(1);
493    }
494
495    fn mark_credit_granted(&mut self, amount: u32, now: Instant) {
496        self.credit_granted = self.credit_granted.saturating_add(1);
497        self.total_credit_granted = self.total_credit_granted.saturating_add(amount as u64);
498        self.last_credit_granted_at = Some(now);
499        self.last_credit_granted_amount = Some(amount);
500        self.pending_local_grant_credit = 0;
501    }
502
503    fn mark_credit_received(&mut self, amount: u32, now: Instant) {
504        self.credit_received = self.credit_received.saturating_add(1);
505        self.total_credit_received = self.total_credit_received.saturating_add(amount as u64);
506        self.last_credit_received_at = Some(now);
507        self.last_credit_received_amount = Some(amount);
508    }
509
510    fn mark_receiver_dropped(&mut self) {
511        self.reset = self.reset.saturating_add(1);
512        self.reset_reason = Some(ChannelResetReason::ReceiverDropped);
513        self.receiver_state = ChannelReceiverState::Dropped;
514        self.dropped = self.dropped.saturating_add(1);
515    }
516
517    fn snapshot(
518        &self,
519        connection_id: ConnectionId,
520        channel_id: ChannelId,
521        available_send_credit: Option<u32>,
522    ) -> ChannelDebugSnapshot {
523        ChannelDebugSnapshot {
524            connection_id,
525            channel_id,
526            direction: self.direction,
527            debug: self.debug,
528            initial_credit: self.initial_credit,
529            available_send_credit,
530            inbound_queue_len: Some(self.inbound_queue_len),
531            inbound_queue_capacity: self.inbound_queue_capacity,
532            outbound_runtime_queue_len: None,
533            outbound_runtime_queue_capacity: None,
534            send_waiters_count: None,
535            receiver_state: self.receiver_state,
536            last_item_sent_at: self.last_item_sent_at,
537            last_item_received_at: self.last_item_received_at,
538            last_item_consumed_at: self.last_item_consumed_at,
539            last_credit_granted_at: self.last_credit_granted_at,
540            last_credit_received_at: self.last_credit_received_at,
541            last_credit_granted_amount: self.last_credit_granted_amount,
542            last_credit_received_amount: self.last_credit_received_amount,
543            pending_local_grant_credit: self.pending_local_grant_credit,
544            total_credit_granted: self.total_credit_granted,
545            total_credit_received: self.total_credit_received,
546            current_permit_count: available_send_credit,
547            zero_credit_with_blocked_senders: available_send_credit == Some(0)
548                && self.sends_waited_for_credit > 0,
549            sent: self.sent,
550            sends_started: self.sends_started,
551            sends_completed: self.sends_completed,
552            sends_waited_for_credit: self.sends_waited_for_credit,
553            try_send_full_credit: self.try_send_full_credit,
554            try_send_full_runtime_queue: self.try_send_full_runtime_queue,
555            closed: self.closed,
556            reset: self.reset,
557            dropped: self.dropped,
558            items_received: self.items_received,
559            items_consumed: self.items_consumed,
560            credit_granted: self.credit_granted,
561            credit_received: self.credit_received,
562            close_reason: self.close_reason,
563            reset_reason: self.reset_reason,
564        }
565    }
566}
567
568/// State shared between the driver loop and any `DriverCaller` / `DriverChannelSink` handles.
569///
570/// `pending_responses` is keyed by request ID and therefore tracks live
571/// request attempts, not logical operations.
572struct DriverShared {
573    connection_id: ConnectionId,
574    pending_responses: SyncMutex<BTreeMap<RequestId, ResponseSlot>>,
575    request_ids: SyncMutex<IdAllocator<RequestId>>,
576    next_operation_id: AtomicU64,
577    operations: Arc<dyn OperationStore>,
578    channel_ids: SyncMutex<IdAllocator<ChannelId>>,
579    /// Registry mapping inbound channel IDs to the sender that feeds the Rx handle.
580    channel_senders: SyncMutex<BTreeMap<ChannelId, ChannelMailboxSender<IncomingChannelMessage>>>,
581    /// Receivers for channels that received messages before application code
582    /// deserialized/registered the corresponding `Rx` handle.
583    channel_receivers:
584        SyncMutex<BTreeMap<ChannelId, ChannelMailboxReceiver<IncomingChannelMessage>>>,
585    /// Credit semaphores for outbound channels (Tx on our side).
586    /// The driver's GrantCredit handler adds permits to these.
587    channel_credits: SyncMutex<BTreeMap<ChannelId, Arc<Semaphore>>>,
588    // r[impl rpc.observability.channel.context]
589    channel_contexts: SyncMutex<BTreeMap<ChannelId, ChannelDebugContext>>,
590    // r[impl rpc.debug.snapshot]
591    request_debug: SyncMutex<BTreeMap<RequestId, RequestRuntimeDebug>>,
592    // r[impl rpc.debug.snapshot]
593    channel_debug: SyncMutex<BTreeMap<ChannelId, ChannelRuntimeDebug>>,
594    last_inbound_message_at: SyncMutex<Option<Instant>>,
595    last_outbound_message_at: SyncMutex<Option<Instant>>,
596    close_reason: SyncMutex<Option<ConnectionCloseReason>>,
597    /// Channel IDs that have reached a terminal local state. Once a channel is
598    /// closed/reset, outbound sinks must reject further sends and inbound items
599    /// must not be buffered forever.
600    terminal_channels: SyncMutex<HashSet<ChannelId>>,
601    /// Channel IDs cleared during session resume. When handler tasks that owned
602    /// these channels are aborted, they may trigger `close_channel_on_drop`, which
603    /// would send a ChannelClose message for a channel the peer no longer knows about.
604    /// We suppress those Close messages by checking this set.
605    stale_close_channels: SyncMutex<std::collections::HashSet<ChannelId>>,
606    // r[impl rpc.flow-control.credit.initial]
607    local_initial_channel_credit: u32,
608    // r[impl rpc.flow-control.credit.initial]
609    peer_initial_channel_credit: u32,
610    observer: Option<VoxObserverHandle>,
611}
612
613impl DriverShared {
614    fn remember_channel_context(
615        &self,
616        channel_id: ChannelId,
617        debug_context: Option<ChannelDebugContext>,
618    ) {
619        if let Some(debug_context) = debug_context.and_then(ChannelDebugContext::into_option) {
620            self.channel_contexts
621                .lock()
622                .insert(channel_id, debug_context);
623            if let Some(channel) = self.channel_debug.lock().get_mut(&channel_id) {
624                channel.debug = Some(debug_context);
625            }
626        }
627    }
628
629    fn channel_event_context(
630        &self,
631        channel_id: ChannelId,
632        debug_context: Option<ChannelDebugContext>,
633    ) -> ChannelEventContext {
634        let debug = debug_context
635            .and_then(ChannelDebugContext::into_option)
636            .or_else(|| self.channel_contexts.lock().get(&channel_id).copied());
637        ChannelEventContext {
638            connection_id: Some(self.connection_id),
639            channel_id,
640            debug,
641        }
642    }
643
644    fn emit_channel_event(
645        &self,
646        channel_id: ChannelId,
647        debug_context: Option<ChannelDebugContext>,
648        event: impl FnOnce(ChannelEventContext) -> ChannelEvent,
649    ) {
650        if let Some(observer) = &self.observer {
651            observer.channel_event(event(self.channel_event_context(channel_id, debug_context)));
652        }
653    }
654
655    fn observe_channel(
656        &self,
657        channel_id: ChannelId,
658        debug_context: Option<ChannelDebugContext>,
659        event: impl FnOnce(ChannelEventContext) -> ChannelEvent,
660    ) {
661        let event = event(self.channel_event_context(channel_id, debug_context));
662        self.record_channel_event(event);
663        if let Some(observer) = &self.observer {
664            observer.channel_event(event);
665        }
666    }
667
668    fn update_channel_debug(
669        &self,
670        channel: ChannelEventContext,
671        default_direction: ChannelDirection,
672        default_initial_credit: u32,
673        update: impl FnOnce(&mut ChannelRuntimeDebug),
674    ) {
675        let mut channels = self.channel_debug.lock();
676        let entry = channels.entry(channel.channel_id).or_insert_with(|| {
677            ChannelRuntimeDebug::new(default_direction, default_initial_credit, channel.debug)
678        });
679        entry.merge_debug(channel.debug);
680        update(entry);
681    }
682
683    fn update_existing_channel_debug(
684        &self,
685        channel_id: ChannelId,
686        update: impl FnOnce(&mut ChannelRuntimeDebug),
687    ) {
688        if let Some(channel) = self.channel_debug.lock().get_mut(&channel_id) {
689            update(channel);
690        }
691    }
692
693    fn record_channel_event(&self, event: ChannelEvent) {
694        let now = Instant::now();
695        match event {
696            ChannelEvent::Opened {
697                channel,
698                direction,
699                initial_credit,
700            } => {
701                self.channel_debug.lock().insert(
702                    channel.channel_id,
703                    ChannelRuntimeDebug::new(direction, initial_credit, channel.debug),
704                );
705            }
706            ChannelEvent::ItemReceived { channel } => {
707                self.update_channel_debug(channel, ChannelDirection::Rx, 0, |entry| {
708                    entry.mark_item_received(now);
709                });
710            }
711            ChannelEvent::Closed { channel, reason } => {
712                self.update_channel_debug(channel, ChannelDirection::Rx, 0, |entry| {
713                    entry.mark_closed(reason);
714                });
715            }
716            ChannelEvent::Reset { channel, reason } => {
717                self.update_channel_debug(channel, ChannelDirection::Rx, 0, |entry| {
718                    entry.mark_reset(reason);
719                });
720            }
721            ChannelEvent::CreditGranted { channel, amount } => {
722                self.record_credit_granted_at(channel.channel_id, amount, now);
723            }
724            ChannelEvent::SendStarted { channel } => {
725                self.record_send_started(channel.channel_id);
726            }
727            ChannelEvent::SendWaitingForCredit { channel } => {
728                self.record_send_waiting_for_credit(channel.channel_id);
729            }
730            ChannelEvent::SendFinished {
731                channel, outcome, ..
732            } => {
733                self.record_send_finished(channel.channel_id, outcome);
734            }
735            ChannelEvent::TrySend { channel, outcome } => {
736                self.record_try_send_outcome(channel.channel_id, outcome);
737            }
738            ChannelEvent::ItemConsumed { channel } => {
739                self.record_item_consumed(channel.channel_id);
740            }
741        }
742    }
743
744    fn mark_inbound_progress(&self) {
745        *self.last_inbound_message_at.lock() = Some(Instant::now());
746    }
747
748    fn mark_outbound_progress(&self) {
749        *self.last_outbound_message_at.lock() = Some(Instant::now());
750    }
751
752    fn start_request(
753        &self,
754        request_id: RequestId,
755        method_id: vox_types::MethodId,
756        service: Option<&'static str>,
757        method: Option<&'static str>,
758        state: RequestDebugState,
759    ) {
760        self.request_debug.lock().insert(
761            request_id,
762            RequestRuntimeDebug {
763                method_id,
764                service,
765                method,
766                started_at: Instant::now(),
767                state,
768                response_sender_blocked: Some(false),
769                associated_channels: Vec::new(),
770            },
771        );
772    }
773
774    fn finish_request(&self, request_id: RequestId, state: RequestDebugState) {
775        if let Some(request) = self.request_debug.lock().get_mut(&request_id) {
776            request.state = state;
777        }
778        self.request_debug.lock().remove(&request_id);
779    }
780
781    fn record_send_started(&self, channel_id: ChannelId) {
782        self.update_existing_channel_debug(channel_id, ChannelRuntimeDebug::mark_send_started);
783    }
784
785    fn record_send_waiting_for_credit(&self, channel_id: ChannelId) {
786        self.update_existing_channel_debug(
787            channel_id,
788            ChannelRuntimeDebug::mark_send_waiting_for_credit,
789        );
790    }
791
792    fn record_send_finished(&self, channel_id: ChannelId, outcome: ChannelSendOutcome) {
793        let now = Instant::now();
794        self.update_existing_channel_debug(channel_id, |channel| {
795            channel.mark_send_finished(outcome, now);
796        });
797    }
798
799    fn record_try_send_outcome(&self, channel_id: ChannelId, outcome: ChannelTrySendOutcome) {
800        let now = Instant::now();
801        self.update_existing_channel_debug(channel_id, |channel| {
802            channel.mark_try_send_outcome(outcome, now);
803        });
804    }
805
806    fn record_item_consumed(&self, channel_id: ChannelId) {
807        let now = Instant::now();
808        self.update_existing_channel_debug(channel_id, |channel| {
809            channel.mark_item_consumed(now);
810        });
811    }
812
813    fn record_inbound_item_not_enqueued(&self, channel_id: ChannelId) {
814        self.update_existing_channel_debug(
815            channel_id,
816            ChannelRuntimeDebug::mark_inbound_item_not_enqueued,
817        );
818    }
819
820    fn record_pending_local_grant(&self, channel_id: ChannelId, pending: u32) {
821        self.update_existing_channel_debug(channel_id, |channel| {
822            channel.pending_local_grant_credit = pending;
823        });
824    }
825
826    fn record_credit_granted_at(&self, channel_id: ChannelId, amount: u32, now: Instant) {
827        self.update_existing_channel_debug(channel_id, |channel| {
828            channel.mark_credit_granted(amount, now);
829        });
830    }
831
832    fn record_credit_received(&self, channel_id: ChannelId, amount: u32) {
833        let now = Instant::now();
834        self.update_existing_channel_debug(channel_id, |channel| {
835            channel.mark_credit_received(amount, now);
836        });
837    }
838
839    fn record_receiver_dropped(&self, channel_id: ChannelId) {
840        self.update_existing_channel_debug(channel_id, ChannelRuntimeDebug::mark_receiver_dropped);
841    }
842
843    fn new_channel_mailbox(
844        &self,
845    ) -> (
846        ChannelMailboxSender<IncomingChannelMessage>,
847        ChannelMailboxReceiver<IncomingChannelMessage>,
848    ) {
849        channel_mailbox(
850            "driver.channel_mailbox",
851            self.local_initial_channel_credit as usize,
852        )
853    }
854
855    fn inbound_channel_sender(
856        &self,
857        channel_id: ChannelId,
858    ) -> ChannelMailboxSender<IncomingChannelMessage> {
859        let mut senders = self.channel_senders.lock();
860        if let Some(sender) = senders.get(&channel_id) {
861            return sender.clone();
862        }
863
864        let (sender, receiver) = self.new_channel_mailbox();
865        senders.insert(channel_id, sender.clone());
866        self.channel_receivers.lock().insert(channel_id, receiver);
867        sender
868    }
869
870    fn register_inbound_channel_receiver(
871        &self,
872        channel_id: ChannelId,
873    ) -> (ChannelMailboxReceiver<IncomingChannelMessage>, bool) {
874        let terminal = self.terminal_channels.lock().contains(&channel_id);
875        let mut senders = self.channel_senders.lock();
876        let mut receivers = self.channel_receivers.lock();
877
878        if let Some(receiver) = receivers.remove(&channel_id) {
879            return (receiver, terminal);
880        }
881
882        let (sender, receiver) = self.new_channel_mailbox();
883        if terminal {
884            drop(sender);
885        } else {
886            senders.insert(channel_id, sender);
887        }
888        (receiver, terminal)
889    }
890
891    fn debug_snapshot(
892        &self,
893        sender: &ConnectionSender,
894        state: ConnectionDebugState,
895        driver_task_status: DriverTaskStatus,
896    ) -> VoxDebugSnapshot {
897        let now = Instant::now();
898        let requests: Vec<_> = self
899            .request_debug
900            .lock()
901            .iter()
902            .map(|(request_id, request)| request.snapshot(*request_id, now))
903            .collect();
904        let credits = self.shared_channel_credit_snapshot();
905        let open_channels: Vec<_> = self
906            .channel_debug
907            .lock()
908            .iter()
909            .map(|(channel_id, channel)| {
910                channel.snapshot(
911                    self.connection_id,
912                    *channel_id,
913                    credits.get(channel_id).copied().flatten(),
914                )
915            })
916            .collect();
917        let last_inbound_message_at = *self.last_inbound_message_at.lock();
918        let last_outbound_message_at = *self.last_outbound_message_at.lock();
919        let last_progress_at = match (last_inbound_message_at, last_outbound_message_at) {
920            (Some(inbound), Some(outbound)) => Some(inbound.max(outbound)),
921            (Some(inbound), None) => Some(inbound),
922            (None, Some(outbound)) => Some(outbound),
923            (None, None) => None,
924        };
925        let (outbound_queue_depth, outbound_queue_capacity) =
926            sender.sess_core.outbound_queue_stats();
927        VoxDebugSnapshot {
928            connections: vec![ConnectionDebugSnapshot {
929                connection_id: self.connection_id,
930                endpoint: None,
931                surface: None,
932                component: None,
933                state,
934                outstanding_requests: requests.len(),
935                requests,
936                open_channels,
937                outbound_queue_depth: Some(outbound_queue_depth),
938                outbound_queue_capacity: Some(outbound_queue_capacity),
939                local_control_queue_depth: None,
940                local_control_queue_capacity: None,
941                last_inbound_message_at,
942                last_outbound_message_at,
943                last_progress_at,
944                close_reason: *self.close_reason.lock(),
945                driver_task_status,
946            }],
947        }
948    }
949
950    fn shared_channel_credit_snapshot(&self) -> BTreeMap<ChannelId, Option<u32>> {
951        self.channel_credits
952            .lock()
953            .iter()
954            .map(|(channel_id, semaphore)| {
955                (
956                    *channel_id,
957                    Some(semaphore.available_permits().min(u32::MAX as usize) as u32),
958                )
959            })
960            .collect()
961    }
962
963    fn set_connection_closed(&self, reason: ConnectionCloseReason) {
964        *self.close_reason.lock() = Some(reason);
965    }
966
967    fn connection_debug_state(&self, closed: bool) -> ConnectionDebugState {
968        if closed {
969            ConnectionDebugState::Closed
970        } else {
971            ConnectionDebugState::Open
972        }
973    }
974}
975
976struct CallerDropGuard {
977    control_tx: mpsc::UnboundedSender<DropControlRequest>,
978    request: DropControlRequest,
979}
980
981impl Drop for CallerDropGuard {
982    fn drop(&mut self) {
983        let _ = self.control_tx.send(self.request);
984    }
985}
986
987#[cfg(test)]
988mod tests {
989    use super::{DriverChannelCreditReplenisher, DriverLocalControl};
990    use vox_types::{ChannelCreditReplenisher, ChannelId};
991
992    #[tokio::test]
993    async fn replenisher_batches_at_half_the_initial_window() {
994        let (tx, mut rx) = moire::sync::mpsc::unbounded_channel("test.replenisher");
995        let replenisher = DriverChannelCreditReplenisher::new(
996            vox_types::ConnectionId::ROOT,
997            ChannelId(7),
998            None,
999            std::sync::Weak::new(),
1000            16,
1001            tx,
1002            None,
1003        );
1004
1005        for _ in 0..7 {
1006            replenisher.on_item_consumed();
1007        }
1008        assert!(
1009            vox_types::time::tokio::timeout(std::time::Duration::from_millis(20), rx.recv())
1010                .await
1011                .is_err(),
1012            "should not emit credit before reaching the batch threshold"
1013        );
1014
1015        replenisher.on_item_consumed();
1016        let Some(DriverLocalControl::GrantCredit {
1017            channel_id,
1018            additional,
1019        }) = rx.recv().await
1020        else {
1021            panic!("expected batched credit grant");
1022        };
1023        assert_eq!(channel_id, ChannelId(7));
1024        assert_eq!(additional, 8);
1025    }
1026
1027    #[tokio::test]
1028    async fn replenisher_grants_one_by_one_for_single_credit_windows() {
1029        let (tx, mut rx) = moire::sync::mpsc::unbounded_channel("test.replenisher.single");
1030        let replenisher = DriverChannelCreditReplenisher::new(
1031            vox_types::ConnectionId::ROOT,
1032            ChannelId(9),
1033            None,
1034            std::sync::Weak::new(),
1035            1,
1036            tx,
1037            None,
1038        );
1039
1040        replenisher.on_item_consumed();
1041        let Some(DriverLocalControl::GrantCredit {
1042            channel_id,
1043            additional,
1044        }) = rx.recv().await
1045        else {
1046            panic!("expected immediate credit grant");
1047        };
1048        assert_eq!(channel_id, ChannelId(9));
1049        assert_eq!(additional, 1);
1050    }
1051}
1052
1053/// Concrete `ReplySink` implementation for the driver.
1054///
1055/// If dropped without `send_reply` being called, automatically sends
1056/// `VoxError::Cancelled` to the caller. This guarantees that every
1057/// request attempt receives exactly one terminal response
1058/// (`rpc.response.one-per-request`), even if the handler panics or
1059/// forgets to reply.
1060pub struct DriverReplySink {
1061    sender: Option<ConnectionSender>,
1062    request_id: RequestId,
1063    method_id: vox_types::MethodId,
1064    retry: vox_types::RetryPolicy,
1065    operation_id: Option<OperationId>,
1066    operations: Option<Arc<dyn OperationStore>>,
1067    live_operations: Option<Arc<SyncMutex<LiveOperationTracker>>>,
1068    binder: DriverChannelBinder,
1069    /// Static `&'static Shape` of the method's response type. Used on
1070    /// replay to derive the schemas to attach to the wire response —
1071    /// the same source of truth that fresh responses use.
1072    handler_response_shape: Option<&'static facet_core::Shape>,
1073}
1074
1075/// Replay a sealed response from the operation store.
1076///
1077/// The stored bytes do NOT contain schemas. Schemas are sourced from the
1078/// operation store via the send tracker, which deduplicates against what
1079/// was already sent on this connection.
1080async fn replay_sealed_response(
1081    sender: ConnectionSender,
1082    request_id: RequestId,
1083    method_id: vox_types::MethodId,
1084    encoded_response: &[u8],
1085    response_shape: Option<&'static facet_core::Shape>,
1086) -> Result<(), ()> {
1087    let mut response: RequestResponse<'_> =
1088        vox_postcard::from_slice_borrowed(encoded_response).map_err(|_| ())?;
1089    if let Some(shape) = response_shape {
1090        sender.prepare_replay_schemas(request_id, method_id, shape, &mut response);
1091    } else {
1092        response.schemas = Default::default();
1093    }
1094    sender.send_response(request_id, response).await
1095}
1096
1097fn incoming_args_bytes<'a>(call: &'a RequestCall<'a>) -> &'a [u8] {
1098    match &call.args {
1099        Payload::PostcardBytes(bytes) => bytes,
1100        Payload::Value { .. } => {
1101            panic!("incoming request payload should always be decoded as incoming bytes")
1102        }
1103    }
1104}
1105
1106impl ReplySink for DriverReplySink {
1107    async fn send_reply(mut self, response: RequestResponse<'_>) {
1108        let sender = self
1109            .sender
1110            .take()
1111            .expect("unreachable: send_reply takes self by value");
1112
1113        vox_types::dlog!(
1114            "[driver] send_reply: conn={:?} req={:?} method={:?} payload={} operation_id={:?}",
1115            sender.connection_id(),
1116            self.request_id,
1117            self.method_id,
1118            match &response.ret {
1119                Payload::Value { .. } => "Value",
1120                Payload::PostcardBytes(_) => "PostcardBytes",
1121            },
1122            self.operation_id
1123        );
1124        tracing::debug!(
1125            conn_id = ?sender.connection_id(),
1126            req_id = ?self.request_id,
1127            method_id = ?self.method_id,
1128            operation_id = ?self.operation_id,
1129            payload = match &response.ret {
1130                Payload::Value { .. } => "value",
1131                Payload::PostcardBytes(_) => "postcard-bytes",
1132            },
1133            "vox driver sending reply"
1134        );
1135        self.binder.shared.mark_outbound_progress();
1136
1137        if let Payload::Value { shape, .. } = &response.ret
1138            && let Ok(extracted) = vox_types::extract_schemas(shape)
1139        {
1140            vox_types::dlog!(
1141                "[schema] driver send_reply: method={:?} root={:?}",
1142                self.method_id,
1143                extracted.root
1144            );
1145        }
1146
1147        if let (Some(operation_id), Some(operations)) = (self.operation_id, self.operations.take())
1148        {
1149            let mut response = response;
1150            sender.prepare_response_for_method(self.request_id, self.method_id, &mut response);
1151
1152            let schemas_for_wire = std::mem::take(&mut response.schemas);
1153            #[cfg(not(target_arch = "wasm32"))]
1154            let encoded_bytes: Vec<u8> =
1155                vox_jit::encode!(&response).expect("JIT encode failed for response store");
1156            #[cfg(target_arch = "wasm32")]
1157            let encoded_bytes: Vec<u8> =
1158                vox_postcard::to_vec(&response).expect("postcard encode failed for response store");
1159            let encoded_for_store: PostcardPayload = encoded_bytes.into();
1160            response.schemas = schemas_for_wire;
1161
1162            // Send the full response (with schemas) on the wire.
1163            vox_types::dlog!(
1164                "[driver] send_reply wire send: conn={:?} req={:?} method={:?} schemas={}",
1165                sender.connection_id(),
1166                self.request_id,
1167                self.method_id,
1168                response.schemas.0.len()
1169            );
1170            if let Err(_e) = sender.send_response(self.request_id, response).await {
1171                tracing::debug!(
1172                    conn_id = ?sender.connection_id(),
1173                    req_id = ?self.request_id,
1174                    method_id = ?self.method_id,
1175                    "vox driver reply send failed"
1176                );
1177                sender.mark_failure(self.request_id, FailureDisposition::Cancelled);
1178            }
1179
1180            // Seal: just the (op_id, method_id, bytes) tuple. No schemas
1181            // — they come from the running code at replay time.
1182            operations.seal(operation_id, self.method_id, &encoded_for_store);
1183
1184            // Get waiters from the live tracker and replay to them.
1185            let waiters = self
1186                .live_operations
1187                .as_ref()
1188                .map(|lo| lo.lock().seal(operation_id))
1189                .unwrap_or_default();
1190            let response_shape = self.handler_response_shape;
1191            for waiter in waiters {
1192                if waiter == self.request_id {
1193                    continue;
1194                }
1195                if replay_sealed_response(
1196                    sender.clone(),
1197                    waiter,
1198                    self.method_id,
1199                    encoded_for_store.as_bytes(),
1200                    response_shape,
1201                )
1202                .await
1203                .is_err()
1204                {
1205                    sender.mark_failure(waiter, FailureDisposition::Cancelled);
1206                }
1207            }
1208        } else {
1209            vox_types::dlog!(
1210                "[driver] send_reply direct send: conn={:?} req={:?} method={:?}",
1211                sender.connection_id(),
1212                self.request_id,
1213                self.method_id
1214            );
1215            if let Err(_e) = sender
1216                .send_response_for_method(self.request_id, self.method_id, response)
1217                .await
1218            {
1219                tracing::debug!(
1220                    conn_id = ?sender.connection_id(),
1221                    req_id = ?self.request_id,
1222                    method_id = ?self.method_id,
1223                    "vox driver reply send failed"
1224                );
1225                sender.mark_failure(self.request_id, FailureDisposition::Cancelled);
1226            }
1227        }
1228    }
1229
1230    fn channel_binder(&self) -> Option<&dyn ChannelBinder> {
1231        Some(&self.binder)
1232    }
1233
1234    fn request_id(&self) -> Option<RequestId> {
1235        Some(self.request_id)
1236    }
1237
1238    fn connection_id(&self) -> Option<vox_types::ConnectionId> {
1239        self.sender.as_ref().map(|sender| sender.connection_id())
1240    }
1241}
1242
1243// r[impl rpc.response.one-per-request]
1244impl Drop for DriverReplySink {
1245    fn drop(&mut self) {
1246        if let Some(sender) = self.sender.take() {
1247            let disposition = if self.retry.persist {
1248                FailureDisposition::Indeterminate
1249            } else {
1250                FailureDisposition::Cancelled
1251            };
1252
1253            if let Some(operation_id) = self.operation_id {
1254                // Don't remove from persistent store — non-idem ops stay as
1255                // Admitted so future lookups return Indeterminate. Idem ops
1256                // were never admitted to the store in the first place.
1257
1258                // Release waiters from the live tracker.
1259                if let Some(live_ops) = self.live_operations.take()
1260                    && let Some(live) = live_ops.lock().release(operation_id)
1261                {
1262                    for waiter in live.waiters {
1263                        sender.mark_failure(waiter, disposition);
1264                    }
1265                    return;
1266                }
1267            }
1268
1269            sender.mark_failure(self.request_id, disposition);
1270        }
1271    }
1272}
1273
1274// r[impl rpc.channel.item]
1275// r[impl rpc.channel.close]
1276/// Concrete [`ChannelSink`] backed by a `ConnectionSender`.
1277///
1278/// Created by the driver when setting up outbound channels (Tx handles).
1279/// Sends `ChannelItem` and `ChannelClose` messages through the connection.
1280/// Wrapped with [`CreditSink`] to enforce credit-based flow control.
1281pub struct DriverChannelSink {
1282    sender: ConnectionSender,
1283    shared: Arc<DriverShared>,
1284    channel_id: ChannelId,
1285    debug_context: Option<ChannelDebugContext>,
1286    local_control_tx: mpsc::UnboundedSender<DriverLocalControl>,
1287}
1288
1289impl ChannelSink for DriverChannelSink {
1290    fn send_payload<'payload>(
1291        &self,
1292        payload: Payload<'payload>,
1293    ) -> Pin<Box<dyn vox_types::MaybeSendFuture<Output = Result<(), TxError>> + 'payload>> {
1294        let sender = self.sender.clone();
1295        let shared = Arc::clone(&self.shared);
1296        let channel_id = self.channel_id;
1297        Box::pin(async move {
1298            if shared.terminal_channels.lock().contains(&channel_id) {
1299                return Err(TxError::Transport("channel closed".into()));
1300            }
1301
1302            shared.mark_outbound_progress();
1303            sender
1304                .send(ConnectionMessage::Channel(ChannelMessage {
1305                    id: channel_id,
1306                    body: ChannelBody::Item(ChannelItem { item: payload }),
1307                }))
1308                .await
1309                .map_err(|()| TxError::Transport("connection closed".into()))
1310        })
1311    }
1312
1313    fn channel_id(&self) -> Option<ChannelId> {
1314        Some(self.channel_id)
1315    }
1316
1317    fn connection_id(&self) -> Option<vox_types::ConnectionId> {
1318        Some(self.sender.connection_id())
1319    }
1320
1321    fn debug_context(&self) -> Option<ChannelDebugContext> {
1322        self.debug_context
1323            .and_then(ChannelDebugContext::into_option)
1324            .or_else(|| {
1325                self.shared
1326                    .channel_contexts
1327                    .lock()
1328                    .get(&self.channel_id)
1329                    .copied()
1330            })
1331    }
1332
1333    fn observer(&self) -> Option<VoxObserverHandle> {
1334        self.shared.observer.clone()
1335    }
1336
1337    fn note_send_started(&self) {
1338        self.shared.record_send_started(self.channel_id);
1339    }
1340
1341    fn note_send_waiting_for_credit(&self) {
1342        self.shared.record_send_waiting_for_credit(self.channel_id);
1343    }
1344
1345    fn note_send_finished(&self, outcome: ChannelSendOutcome) {
1346        self.shared.record_send_finished(self.channel_id, outcome);
1347    }
1348
1349    fn note_try_send_outcome(&self, outcome: ChannelTrySendOutcome) {
1350        self.shared
1351            .record_try_send_outcome(self.channel_id, outcome);
1352    }
1353
1354    // r[impl rpc.flow-control.credit.try-send]
1355    // r[impl rpc.observability.channel.try-send-detail]
1356    fn try_send_payload_with_outcome<'payload>(
1357        &self,
1358        payload: Payload<'payload>,
1359    ) -> Result<(), ChannelTrySendOutcome> {
1360        if self
1361            .shared
1362            .terminal_channels
1363            .lock()
1364            .contains(&self.channel_id)
1365        {
1366            return Err(ChannelTrySendOutcome::Closed);
1367        }
1368
1369        self.shared.mark_outbound_progress();
1370        self.sender
1371            .try_send(ConnectionMessage::Channel(ChannelMessage {
1372                id: self.channel_id,
1373                body: ChannelBody::Item(ChannelItem { item: payload }),
1374            }))
1375            .map_err(|err| match err {
1376                TrySendError::Closed(()) => ChannelTrySendOutcome::Closed,
1377                TrySendError::Full(()) => ChannelTrySendOutcome::FullRuntimeQueue,
1378            })
1379    }
1380
1381    fn close_channel(
1382        &self,
1383        _metadata: vox_types::Metadata,
1384    ) -> Pin<Box<dyn vox_types::MaybeSendFuture<Output = Result<(), TxError>> + 'static>> {
1385        // [FIXME] ChannelSink::close_channel takes borrowed Metadata but returns 'static future.
1386        // We drop the borrowed metadata and send an empty one. This matches the [FIXME] in the
1387        // trait definition — the signature needs to be fixed to take owned metadata.
1388        let sender = self.sender.clone();
1389        let shared = Arc::clone(&self.shared);
1390        let channel_id = self.channel_id;
1391        let debug_context = self.debug_context;
1392        Box::pin(async move {
1393            shared.terminal_channels.lock().insert(channel_id);
1394            shared.observe_channel(channel_id, debug_context, |channel| ChannelEvent::Closed {
1395                channel,
1396                reason: ChannelCloseReason::Local,
1397            });
1398
1399            shared.mark_outbound_progress();
1400            sender
1401                .send(ConnectionMessage::Channel(ChannelMessage {
1402                    id: channel_id,
1403                    body: ChannelBody::Close(ChannelClose {
1404                        metadata: Default::default(),
1405                    }),
1406                }))
1407                .await
1408                .map_err(|()| TxError::Transport("connection closed".into()))
1409        })
1410    }
1411
1412    fn close_channel_on_drop(&self) {
1413        self.shared.terminal_channels.lock().insert(self.channel_id);
1414        self.shared
1415            .observe_channel(self.channel_id, self.debug_context, |channel| {
1416                ChannelEvent::Closed {
1417                    channel,
1418                    reason: ChannelCloseReason::Dropped,
1419                }
1420            });
1421        let _ = self
1422            .local_control_tx
1423            .send(DriverLocalControl::CloseChannel {
1424                channel_id: self.channel_id,
1425            });
1426    }
1427}
1428
1429/// Object-safe version of [`Handler<DriverReplySink>`].
1430///
1431/// Boxes the future returned by `handle()` so the trait is dyn-safe.
1432/// Implemented automatically for any `Handler<DriverReplySink>`.
1433pub trait ErasedHandler: MaybeSend + MaybeSync + 'static {
1434    fn retry_policy(&self, method_id: vox_types::MethodId) -> vox_types::RetryPolicy {
1435        let _ = method_id;
1436        vox_types::RetryPolicy::VOLATILE
1437    }
1438
1439    fn args_have_channels(&self, method_id: vox_types::MethodId) -> bool {
1440        let _ = method_id;
1441        false
1442    }
1443
1444    fn response_wire_shape(&self, method_id: vox_types::MethodId) -> Option<&'static facet::Shape> {
1445        let _ = method_id;
1446        None
1447    }
1448
1449    fn handle_erased(
1450        &self,
1451        call: SelfRef<RequestCall<'static>>,
1452        reply: DriverReplySink,
1453        schemas: std::sync::Arc<vox_types::SchemaRecvTracker>,
1454    ) -> BoxFut<'_, ()>;
1455}
1456
1457impl<H: Handler<DriverReplySink>> ErasedHandler for H {
1458    fn retry_policy(&self, method_id: vox_types::MethodId) -> vox_types::RetryPolicy {
1459        Handler::retry_policy(self, method_id)
1460    }
1461
1462    fn args_have_channels(&self, method_id: vox_types::MethodId) -> bool {
1463        Handler::args_have_channels(self, method_id)
1464    }
1465
1466    fn response_wire_shape(&self, method_id: vox_types::MethodId) -> Option<&'static facet::Shape> {
1467        Handler::response_wire_shape(self, method_id)
1468    }
1469
1470    fn handle_erased(
1471        &self,
1472        call: SelfRef<RequestCall<'static>>,
1473        reply: DriverReplySink,
1474        schemas: std::sync::Arc<vox_types::SchemaRecvTracker>,
1475    ) -> BoxFut<'_, ()> {
1476        Box::pin(Handler::handle(self, call, reply, schemas))
1477    }
1478}
1479
1480impl Handler<DriverReplySink> for Box<dyn ErasedHandler> {
1481    fn retry_policy(&self, method_id: vox_types::MethodId) -> vox_types::RetryPolicy {
1482        (**self).retry_policy(method_id)
1483    }
1484
1485    fn args_have_channels(&self, method_id: vox_types::MethodId) -> bool {
1486        (**self).args_have_channels(method_id)
1487    }
1488
1489    fn response_wire_shape(&self, method_id: vox_types::MethodId) -> Option<&'static facet::Shape> {
1490        (**self).response_wire_shape(method_id)
1491    }
1492
1493    async fn handle(
1494        &self,
1495        call: SelfRef<RequestCall<'static>>,
1496        reply: DriverReplySink,
1497        schemas: std::sync::Arc<vox_types::SchemaRecvTracker>,
1498    ) {
1499        (**self).handle_erased(call, reply, schemas).await
1500    }
1501}
1502
1503/// Concrete caller type wrapping a [`DriverCaller`] with optional middleware.
1504///
1505/// This is the primary type for making outbound RPC calls. Generated `*Client`
1506/// types store a `Caller` as a public field. Use `with_middleware()` to add
1507/// client middleware to the call chain.
1508#[must_use = "Dropping this caller may close the connection if it is the last caller."]
1509#[derive(Clone)]
1510pub struct Caller {
1511    inner: Arc<DriverCaller>,
1512    service: Option<&'static vox_types::ServiceDescriptor>,
1513    middlewares: Vec<Arc<dyn vox_types::ClientMiddleware>>,
1514}
1515
1516impl Caller {
1517    /// Create a new `Caller` wrapping a [`DriverCaller`].
1518    pub fn new(driver: DriverCaller) -> Self {
1519        Self {
1520            inner: Arc::new(driver),
1521            service: None,
1522            middlewares: vec![],
1523        }
1524    }
1525
1526    /// Access the underlying [`DriverCaller`] for low-level operations.
1527    #[cfg(test)]
1528    pub(crate) fn driver(&self) -> &DriverCaller {
1529        &self.inner
1530    }
1531
1532    /// Append a client middleware to this caller's chain.
1533    pub fn with_middleware(
1534        mut self,
1535        service: &'static vox_types::ServiceDescriptor,
1536        middleware: impl vox_types::ClientMiddleware,
1537    ) -> Self {
1538        if let Some(existing_service) = self.service {
1539            assert_eq!(
1540                existing_service.service_name, service.service_name,
1541                "Caller middleware service mismatch"
1542            );
1543        } else {
1544            self.service = Some(service);
1545        }
1546        self.middlewares.push(Arc::new(middleware));
1547        self
1548    }
1549
1550    /// Start one outgoing request attempt and wait for its response,
1551    /// running any registered middleware around the call.
1552    pub async fn call(&self, mut call: RequestCall<'_>) -> CallResult {
1553        use vox_types::{
1554            ClientCallOutcome, ClientContext, ClientRequest, Extensions, OwnedMetadata,
1555        };
1556
1557        let Some(service) = self.service else {
1558            return self.inner.call_inner(call, None).await;
1559        };
1560
1561        let extensions = Extensions::new();
1562        let method = service.by_id(call.method_id);
1563        let context = ClientContext::new(method, call.method_id, &extensions);
1564        let mut owned_metadata = OwnedMetadata::default();
1565
1566        if !self.middlewares.is_empty() {
1567            for middleware in &self.middlewares {
1568                let mut request = ClientRequest::new(&mut call, &mut owned_metadata);
1569                middleware.pre(&context, &mut request).await;
1570            }
1571        }
1572
1573        let request_debug = method.map(|method| (method.service_name, method.method_name));
1574        let result = self.inner.call_inner(call, request_debug).await;
1575        if !self.middlewares.is_empty() {
1576            let outcome = match &result {
1577                Ok(_) => ClientCallOutcome::Response,
1578                Err(error) => ClientCallOutcome::Error(error),
1579            };
1580            for middleware in self.middlewares.iter().rev() {
1581                middleware.post(&context, outcome).await;
1582            }
1583        }
1584        result
1585    }
1586
1587    /// Resolve when the underlying connection closes.
1588    pub async fn closed(&self) {
1589        if self.inner.closed_rx.borrow().is_some() {
1590            return;
1591        }
1592        let mut rx = self.inner.closed_rx.clone();
1593        while rx.changed().await.is_ok() {
1594            if rx.borrow().is_some() {
1595                return;
1596            }
1597        }
1598    }
1599
1600    /// Return whether the underlying connection is still considered connected.
1601    pub fn is_connected(&self) -> bool {
1602        self.inner.closed_rx.borrow().is_none()
1603    }
1604
1605    /// Return a channel binder for binding Tx/Rx handles in args before sending.
1606    pub fn channel_binder(&self) -> Option<&dyn ChannelBinder> {
1607        Some(self.inner.as_ref())
1608    }
1609
1610    // r[impl rpc.debug.snapshot]
1611    pub fn debug_snapshot(&self) -> VoxDebugSnapshot {
1612        self.inner.debug_snapshot()
1613    }
1614
1615    pub fn dump_debug_snapshot(&self) -> VoxDebugSnapshot {
1616        let snapshot = self.debug_snapshot();
1617        tracing::info!(?snapshot, "vox debug snapshot");
1618        snapshot
1619    }
1620}
1621
1622/// Trait for constructing a typed client from a vox session.
1623///
1624/// Generated `*Client` types implement this to receive both the caller
1625/// and an optional session handle. Root connections pass `Some(handle)`;
1626/// virtual connections pass `None`.
1627pub trait FromVoxSession {
1628    /// The service name for this client, used for automatic `vox-service` metadata.
1629    /// Generated clients return `Some("ServiceName")`. `NoopClient` returns `None`.
1630    const SERVICE_NAME: &'static str;
1631
1632    fn from_vox_session(
1633        caller: Caller,
1634        session_handle: Option<crate::session::SessionHandle>,
1635    ) -> Self;
1636}
1637
1638/// Liveness-only client for a connection root.
1639///
1640/// Keeps the root connection alive but intentionally exposes no outbound RPC API.
1641/// Use this as the type parameter to `establish()` when you don't need a typed client.
1642#[must_use = "Dropping NoopClient may close the connection if it is the last caller."]
1643#[derive(Clone)]
1644pub struct NoopClient {
1645    /// The underlying caller keeping the connection alive.
1646    pub caller: Caller,
1647    /// The session handle, if this client is on a root connection.
1648    pub session: Option<crate::session::SessionHandle>,
1649}
1650
1651impl FromVoxSession for NoopClient {
1652    const SERVICE_NAME: &'static str = "Noop";
1653
1654    fn from_vox_session(caller: Caller, session: Option<crate::session::SessionHandle>) -> Self {
1655        Self { caller, session }
1656    }
1657}
1658
1659#[derive(Clone)]
1660struct DriverChannelBinder {
1661    sender: ConnectionSender,
1662    shared: Arc<DriverShared>,
1663    local_control_tx: mpsc::UnboundedSender<DriverLocalControl>,
1664    drop_guard: Option<Arc<CallerDropGuard>>,
1665}
1666
1667fn register_rx_channel_impl(
1668    shared: &Arc<DriverShared>,
1669    channel_id: ChannelId,
1670    initial_channel_credit: u32,
1671    debug_context: Option<ChannelDebugContext>,
1672    liveness: Option<ChannelLivenessHandle>,
1673    local_control_tx: mpsc::UnboundedSender<DriverLocalControl>,
1674) -> vox_types::BoundChannelReceiver {
1675    observe_channel_opened(
1676        shared,
1677        channel_id,
1678        ChannelDirection::Rx,
1679        initial_channel_credit,
1680        debug_context,
1681    );
1682    let (rx, terminal) = shared.register_inbound_channel_receiver(channel_id);
1683
1684    if terminal {
1685        shared.channel_credits.lock().remove(&channel_id);
1686        return vox_types::BoundChannelReceiver {
1687            receiver: rx,
1688            liveness,
1689            replenisher: None,
1690        };
1691    }
1692
1693    vox_types::BoundChannelReceiver {
1694        receiver: rx,
1695        liveness,
1696        replenisher: Some(Arc::new(DriverChannelCreditReplenisher::new(
1697            shared.connection_id,
1698            channel_id,
1699            debug_context,
1700            Arc::downgrade(shared),
1701            initial_channel_credit,
1702            local_control_tx,
1703            shared.observer.clone(),
1704        )) as ChannelCreditReplenisherHandle),
1705    }
1706}
1707
1708// r[impl rpc.observability.channel]
1709fn observe_channel_opened(
1710    shared: &DriverShared,
1711    channel_id: ChannelId,
1712    direction: ChannelDirection,
1713    initial_credit: u32,
1714    debug_context: Option<ChannelDebugContext>,
1715) {
1716    shared.remember_channel_context(channel_id, debug_context);
1717    shared.observe_channel(channel_id, debug_context, |channel| ChannelEvent::Opened {
1718        channel,
1719        direction,
1720        initial_credit,
1721    });
1722}
1723
1724fn make_tx_channel_sink(
1725    sender: &ConnectionSender,
1726    shared: &Arc<DriverShared>,
1727    local_control_tx: &mpsc::UnboundedSender<DriverLocalControl>,
1728    channel_id: ChannelId,
1729    debug_context: Option<ChannelDebugContext>,
1730) -> Arc<CreditSink<DriverChannelSink>> {
1731    observe_channel_opened(
1732        shared,
1733        channel_id,
1734        ChannelDirection::Tx,
1735        shared.peer_initial_channel_credit,
1736        debug_context,
1737    );
1738    let inner = DriverChannelSink {
1739        sender: sender.clone(),
1740        shared: Arc::clone(shared),
1741        channel_id,
1742        debug_context: debug_context.and_then(ChannelDebugContext::into_option),
1743        local_control_tx: local_control_tx.clone(),
1744    };
1745    let sink = Arc::new(CreditSink::new(inner, shared.peer_initial_channel_credit));
1746    shared
1747        .channel_credits
1748        .lock()
1749        .insert(channel_id, Arc::clone(sink.credit()));
1750    sink
1751}
1752
1753trait DriverChannelEndpoint {
1754    fn endpoint_sender(&self) -> &ConnectionSender;
1755    fn endpoint_shared(&self) -> &Arc<DriverShared>;
1756    fn endpoint_local_control_tx(&self) -> &mpsc::UnboundedSender<DriverLocalControl>;
1757    fn endpoint_liveness(&self) -> Option<ChannelLivenessHandle>;
1758
1759    fn create_tx_credit_sink(
1760        &self,
1761        debug_context: Option<ChannelDebugContext>,
1762    ) -> (ChannelId, Arc<CreditSink<DriverChannelSink>>) {
1763        let shared = self.endpoint_shared();
1764        let channel_id = shared.channel_ids.lock().alloc();
1765        let sink = make_tx_channel_sink(
1766            self.endpoint_sender(),
1767            shared,
1768            self.endpoint_local_control_tx(),
1769            channel_id,
1770            debug_context,
1771        );
1772        (channel_id, sink)
1773    }
1774
1775    fn create_tx_dyn(
1776        &self,
1777        debug_context: Option<ChannelDebugContext>,
1778    ) -> (ChannelId, Arc<dyn ChannelSink>) {
1779        let (id, sink) = self.create_tx_credit_sink(debug_context);
1780        (id, sink as Arc<dyn ChannelSink>)
1781    }
1782
1783    fn create_rx_bound(
1784        &self,
1785        debug_context: Option<ChannelDebugContext>,
1786    ) -> (ChannelId, vox_types::BoundChannelReceiver) {
1787        let channel_id = self.endpoint_shared().channel_ids.lock().alloc();
1788        let rx = self.register_rx_bound(channel_id, debug_context);
1789        (channel_id, rx)
1790    }
1791
1792    fn bind_tx_dyn(
1793        &self,
1794        channel_id: ChannelId,
1795        debug_context: Option<ChannelDebugContext>,
1796    ) -> Arc<dyn ChannelSink> {
1797        make_tx_channel_sink(
1798            self.endpoint_sender(),
1799            self.endpoint_shared(),
1800            self.endpoint_local_control_tx(),
1801            channel_id,
1802            debug_context,
1803        )
1804    }
1805
1806    fn register_rx_bound(
1807        &self,
1808        channel_id: ChannelId,
1809        debug_context: Option<ChannelDebugContext>,
1810    ) -> vox_types::BoundChannelReceiver {
1811        let shared = self.endpoint_shared();
1812        register_rx_channel_impl(
1813            shared,
1814            channel_id,
1815            shared.local_initial_channel_credit,
1816            debug_context,
1817            self.endpoint_liveness(),
1818            self.endpoint_local_control_tx().clone(),
1819        )
1820    }
1821}
1822
1823impl DriverChannelEndpoint for DriverChannelBinder {
1824    fn endpoint_sender(&self) -> &ConnectionSender {
1825        &self.sender
1826    }
1827
1828    fn endpoint_shared(&self) -> &Arc<DriverShared> {
1829        &self.shared
1830    }
1831
1832    fn endpoint_local_control_tx(&self) -> &mpsc::UnboundedSender<DriverLocalControl> {
1833        &self.local_control_tx
1834    }
1835
1836    fn endpoint_liveness(&self) -> Option<ChannelLivenessHandle> {
1837        self.drop_guard
1838            .as_ref()
1839            .map(|guard| guard.clone() as ChannelLivenessHandle)
1840    }
1841}
1842
1843impl ChannelBinder for DriverChannelBinder {
1844    fn create_tx(&self) -> (ChannelId, Arc<dyn ChannelSink>) {
1845        self.create_tx_dyn(None)
1846    }
1847
1848    fn create_tx_with_context(
1849        &self,
1850        debug_context: Option<ChannelDebugContext>,
1851    ) -> (ChannelId, Arc<dyn ChannelSink>) {
1852        self.create_tx_dyn(debug_context)
1853    }
1854
1855    fn create_rx(&self) -> (ChannelId, vox_types::BoundChannelReceiver) {
1856        self.create_rx_bound(None)
1857    }
1858
1859    fn create_rx_with_context(
1860        &self,
1861        debug_context: Option<ChannelDebugContext>,
1862    ) -> (ChannelId, vox_types::BoundChannelReceiver) {
1863        self.create_rx_bound(debug_context)
1864    }
1865
1866    fn bind_tx(&self, channel_id: ChannelId) -> Arc<dyn ChannelSink> {
1867        self.bind_tx_dyn(channel_id, None)
1868    }
1869
1870    fn bind_tx_with_context(
1871        &self,
1872        channel_id: ChannelId,
1873        debug_context: Option<ChannelDebugContext>,
1874    ) -> Arc<dyn ChannelSink> {
1875        self.bind_tx_dyn(channel_id, debug_context)
1876    }
1877
1878    fn register_rx(&self, channel_id: ChannelId) -> vox_types::BoundChannelReceiver {
1879        self.register_rx_bound(channel_id, None)
1880    }
1881
1882    fn register_rx_with_context(
1883        &self,
1884        channel_id: ChannelId,
1885        debug_context: Option<ChannelDebugContext>,
1886    ) -> vox_types::BoundChannelReceiver {
1887        self.register_rx_bound(channel_id, debug_context)
1888    }
1889
1890    fn channel_liveness(&self) -> Option<ChannelLivenessHandle> {
1891        self.endpoint_liveness()
1892    }
1893}
1894
1895/// Allocates a request ID, registers a response slot,
1896/// sends one request attempt through the connection, and awaits the
1897/// corresponding response.
1898#[derive(Clone)]
1899pub struct DriverCaller {
1900    sender: ConnectionSender,
1901    shared: Arc<DriverShared>,
1902    local_control_tx: mpsc::UnboundedSender<DriverLocalControl>,
1903    closed_rx: watch::Receiver<Option<ConnectionCloseReason>>,
1904    resumed_rx: watch::Receiver<u64>,
1905    resume_processed_rx: watch::Receiver<u64>,
1906    peer_supports_retry: bool,
1907    _drop_guard: Option<Arc<CallerDropGuard>>,
1908}
1909
1910impl DriverCaller {
1911    /// Allocate a channel ID and create a credit-controlled sink for outbound items.
1912    ///
1913    /// The returned sink enforces credit; the semaphore is registered so
1914    /// `GrantCredit` messages can add permits.
1915    pub fn create_tx_channel(&self) -> (ChannelId, Arc<CreditSink<DriverChannelSink>>) {
1916        self.create_tx_credit_sink(None)
1917    }
1918
1919    /// Returns the underlying connection sender.
1920    ///
1921    /// Used by in-crate tests that need to inject raw messages for cancellation
1922    /// and channel protocol testing.
1923    #[cfg(test)]
1924    pub(crate) fn connection_sender(&self) -> &ConnectionSender {
1925        &self.sender
1926    }
1927
1928    /// Register an inbound channel (Rx on our side) and return the receiver.
1929    ///
1930    /// The channel ID comes from the peer (e.g. from `RequestCall.channels`).
1931    /// The returned receiver should be bound to an `Rx` handle via `Rx::bind()`.
1932    pub fn register_rx_channel(&self, channel_id: ChannelId) -> vox_types::BoundChannelReceiver {
1933        self.register_rx_bound(channel_id, None)
1934    }
1935}
1936
1937impl DriverChannelEndpoint for DriverCaller {
1938    fn endpoint_sender(&self) -> &ConnectionSender {
1939        &self.sender
1940    }
1941
1942    fn endpoint_shared(&self) -> &Arc<DriverShared> {
1943        &self.shared
1944    }
1945
1946    fn endpoint_local_control_tx(&self) -> &mpsc::UnboundedSender<DriverLocalControl> {
1947        &self.local_control_tx
1948    }
1949
1950    fn endpoint_liveness(&self) -> Option<ChannelLivenessHandle> {
1951        self._drop_guard
1952            .as_ref()
1953            .map(|guard| guard.clone() as ChannelLivenessHandle)
1954    }
1955}
1956
1957impl ChannelBinder for DriverCaller {
1958    fn create_tx(&self) -> (ChannelId, Arc<dyn ChannelSink>) {
1959        self.create_tx_dyn(None)
1960    }
1961
1962    fn create_tx_with_context(
1963        &self,
1964        debug_context: Option<ChannelDebugContext>,
1965    ) -> (ChannelId, Arc<dyn ChannelSink>) {
1966        self.create_tx_dyn(debug_context)
1967    }
1968
1969    fn create_rx(&self) -> (ChannelId, vox_types::BoundChannelReceiver) {
1970        self.create_rx_bound(None)
1971    }
1972
1973    fn create_rx_with_context(
1974        &self,
1975        debug_context: Option<ChannelDebugContext>,
1976    ) -> (ChannelId, vox_types::BoundChannelReceiver) {
1977        self.create_rx_bound(debug_context)
1978    }
1979
1980    fn bind_tx(&self, channel_id: ChannelId) -> Arc<dyn ChannelSink> {
1981        self.bind_tx_dyn(channel_id, None)
1982    }
1983
1984    fn bind_tx_with_context(
1985        &self,
1986        channel_id: ChannelId,
1987        debug_context: Option<ChannelDebugContext>,
1988    ) -> Arc<dyn ChannelSink> {
1989        self.bind_tx_dyn(channel_id, debug_context)
1990    }
1991
1992    fn register_rx(&self, channel_id: ChannelId) -> vox_types::BoundChannelReceiver {
1993        self.register_rx_bound(channel_id, None)
1994    }
1995
1996    fn register_rx_with_context(
1997        &self,
1998        channel_id: ChannelId,
1999        debug_context: Option<ChannelDebugContext>,
2000    ) -> vox_types::BoundChannelReceiver {
2001        self.register_rx_bound(channel_id, debug_context)
2002    }
2003
2004    fn channel_liveness(&self) -> Option<ChannelLivenessHandle> {
2005        self.endpoint_liveness()
2006    }
2007}
2008
2009impl DriverCaller {
2010    // r[impl rpc.debug.snapshot]
2011    pub fn debug_snapshot(&self) -> VoxDebugSnapshot {
2012        self.shared.debug_snapshot(
2013            &self.sender,
2014            self.shared
2015                .connection_debug_state(self.closed_rx.borrow().is_some()),
2016            if self.closed_rx.borrow().is_some() {
2017                DriverTaskStatus::Dead
2018            } else {
2019                DriverTaskStatus::Alive
2020            },
2021        )
2022    }
2023
2024    pub fn dump_debug_snapshot(&self) -> VoxDebugSnapshot {
2025        let snapshot = self.debug_snapshot();
2026        tracing::info!(?snapshot, "vox debug snapshot");
2027        snapshot
2028    }
2029
2030    /// Internal: perform a single outbound RPC call attempt (no middleware).
2031    async fn call_inner(
2032        &self,
2033        mut call: RequestCall<'_>,
2034        request_debug: Option<(&'static str, &'static str)>,
2035    ) -> CallResult {
2036        if self.peer_supports_retry {
2037            let operation_id = OperationId(
2038                self.shared
2039                    .next_operation_id
2040                    .fetch_add(1, Ordering::Relaxed),
2041            );
2042            ensure_operation_id(&mut call.metadata, operation_id);
2043        }
2044
2045        // Allocate a request ID.
2046        let req_id = self.shared.request_ids.lock().alloc();
2047        let request_started_at = Instant::now();
2048        let (service_name, method_name) = request_debug.unwrap_or(("<unknown>", "<unknown>"));
2049        tracing::debug!(
2050            conn_id = ?self.sender.connection_id(),
2051            ?req_id,
2052            method_id = ?call.method_id,
2053            service = service_name,
2054            method = method_name,
2055            "vox caller starting request"
2056        );
2057        if let Some(observer) = &self.shared.observer {
2058            observer.driver_event(DriverEvent::RequestStarted {
2059                connection_id: self.sender.connection_id(),
2060                request_id: req_id,
2061                method_id: call.method_id,
2062            });
2063        }
2064        let finish_request = |outcome: RpcOutcome| {
2065            self.shared.finish_request(
2066                req_id,
2067                if outcome == RpcOutcome::Ok {
2068                    RequestDebugState::Finished
2069                } else {
2070                    RequestDebugState::Failed
2071                },
2072            );
2073            if let Some(observer) = &self.shared.observer {
2074                observer.driver_event(DriverEvent::RequestFinished {
2075                    connection_id: self.sender.connection_id(),
2076                    request_id: req_id,
2077                    outcome,
2078                    elapsed: request_started_at.elapsed(),
2079                });
2080            }
2081        };
2082
2083        // Register the response slot before sending, so the driver can
2084        // route the response even if it arrives before we start awaiting.
2085        let (tx, rx) = moire::sync::oneshot::channel("driver.response");
2086        self.shared.pending_responses.lock().insert(req_id, tx);
2087        self.shared.start_request(
2088            req_id,
2089            call.method_id,
2090            Some(service_name),
2091            Some(method_name),
2092            RequestDebugState::WaitingForResponse,
2093        );
2094
2095        // r[impl schema.exchange.caller]
2096        // r[impl schema.exchange.channels]
2097        // Schemas are attached by SessionCore::send() when it sees a Call
2098        // with Payload::Value — no separate prepare step needed.
2099        //
2100        // Channel binding happens during serialization via the thread-local
2101        // ChannelBinder — no post-hoc walk needed.
2102        self.shared.mark_outbound_progress();
2103        tracing::debug!(
2104            conn_id = ?self.sender.connection_id(),
2105            ?req_id,
2106            method_id = ?call.method_id,
2107            service = service_name,
2108            method = method_name,
2109            "vox caller sending request"
2110        );
2111        if self
2112            .sender
2113            .send_with_binder(
2114                ConnectionMessage::Request(RequestMessage {
2115                    id: req_id,
2116                    body: RequestBody::Call(RequestCall {
2117                        method_id: call.method_id,
2118                        args: call.args.reborrow(),
2119                        metadata: call.metadata.clone(),
2120                        schemas: Default::default(),
2121                    }),
2122                }),
2123                Some(self),
2124            )
2125            .await
2126            .is_err()
2127        {
2128            tracing::debug!(
2129                conn_id = ?self.sender.connection_id(),
2130                ?req_id,
2131                method_id = ?call.method_id,
2132                service = service_name,
2133                method = method_name,
2134                "vox caller request send failed"
2135            );
2136            self.shared.pending_responses.lock().remove(&req_id);
2137            finish_request(RpcOutcome::SendFailed);
2138            return Err(VoxError::SendFailed);
2139        }
2140        tracing::debug!(
2141            conn_id = ?self.sender.connection_id(),
2142            ?req_id,
2143            method_id = ?call.method_id,
2144            service = service_name,
2145            method = method_name,
2146            "vox caller request sent; waiting for response"
2147        );
2148
2149        let mut resumed_rx = self.resumed_rx.clone();
2150        let mut seen_resume_generation = *resumed_rx.borrow();
2151        let mut resume_processed_rx = self.resume_processed_rx.clone();
2152        let mut closed_rx = self.closed_rx.clone();
2153        let mut response = std::pin::pin!(rx.named("awaiting_response"));
2154
2155        let pending: PendingResponse = loop {
2156            tokio::select! {
2157                result = &mut response => {
2158                    match result {
2159                        Ok(pending) => {
2160                            tracing::debug!(
2161                                conn_id = ?self.sender.connection_id(),
2162                                ?req_id,
2163                                method_id = ?call.method_id,
2164                                service = service_name,
2165                                method = method_name,
2166                                "vox caller received response"
2167                            );
2168                            break pending;
2169                        }
2170                        Err(_) => {
2171                            finish_request(RpcOutcome::Closed);
2172                            return Err(VoxError::ConnectionClosed);
2173                        }
2174                    }
2175                }
2176                changed = resumed_rx.changed(), if self.peer_supports_retry => {
2177                    vox_types::dlog!("[CALLER] resumed_rx fired");
2178                    if changed.is_err() {
2179                        self.shared.pending_responses.lock().remove(&req_id);
2180                        finish_request(RpcOutcome::Closed);
2181                        return Err(VoxError::SessionShutdown);
2182                    }
2183                    let generation = *resumed_rx.borrow();
2184                    if generation == seen_resume_generation {
2185                        continue;
2186                    }
2187                    seen_resume_generation = generation;
2188                    while *resume_processed_rx.borrow() < generation {
2189                        if resume_processed_rx.changed().await.is_err() {
2190                            self.shared.pending_responses.lock().remove(&req_id);
2191                            finish_request(RpcOutcome::Closed);
2192                            return Err(VoxError::SessionShutdown);
2193                        }
2194                    }
2195                    match metadata_channel_retry_mode(&call.metadata) {
2196                        ChannelRetryMode::NonIdem => {
2197                            self.shared.pending_responses.lock().remove(&req_id);
2198                            finish_request(RpcOutcome::Indeterminate);
2199                            return Err(VoxError::Indeterminate);
2200                        }
2201                        ChannelRetryMode::Idem | ChannelRetryMode::None => {}
2202                    }
2203                    // Re-send the request after resume.
2204                    // Channel binding is embedded in the serialized payload,
2205                    // so no separate re-binding step is needed.
2206                    self.shared.mark_outbound_progress();
2207                    let _ = self.sender.send_with_binder(
2208                        ConnectionMessage::Request(RequestMessage {
2209                            id: req_id,
2210                            body: RequestBody::Call(RequestCall {
2211                                method_id: call.method_id,
2212                                args: call.args.reborrow(),
2213                                metadata: call.metadata.clone(),
2214                                schemas: Default::default(),
2215                            }),
2216                        }),
2217                        Some(self),
2218                    ).await;
2219                }
2220                changed = closed_rx.changed() => {
2221                    vox_types::dlog!("[CALLER] closed_rx fired, value={:?}", *closed_rx.borrow());
2222                    if changed.is_err() || closed_rx.borrow().is_some() {
2223                        self.shared.pending_responses.lock().remove(&req_id);
2224                        finish_request(RpcOutcome::Closed);
2225                        return Err(VoxError::ConnectionClosed);
2226                    }
2227                }
2228            }
2229        };
2230
2231        // Extract the Response variant from the RequestMessage.
2232        let PendingResponse {
2233            msg: response_msg,
2234            schemas: response_schemas,
2235            fds: response_fds,
2236        } = pending;
2237        let response = response_msg.map(|m| match m.body {
2238            RequestBody::Response(r) => r,
2239            _ => unreachable!("pending_responses only gets Response variants"),
2240        });
2241
2242        finish_request(RpcOutcome::Ok);
2243        Ok(vox_types::WithTracker {
2244            value: response,
2245            tracker: response_schemas,
2246            fds: response_fds,
2247        })
2248    }
2249}
2250
2251// r[impl rpc.handler]
2252// r[impl rpc.request]
2253// r[impl rpc.response]
2254// r[impl rpc.pipelining]
2255/// Per-connection driver. Tracks in-flight request attempts, dispatches
2256/// incoming requests to a `Handler`, and manages channel state / flow control.
2257pub struct Driver<H: Handler<DriverReplySink>> {
2258    sender: ConnectionSender,
2259    rx: mpsc::Receiver<crate::session::RecvMessage>,
2260    failures_rx: mpsc::UnboundedReceiver<(RequestId, FailureDisposition)>,
2261    closed_rx: watch::Receiver<Option<ConnectionCloseReason>>,
2262    resumed_rx: watch::Receiver<u64>,
2263    resume_processed_tx: watch::Sender<u64>,
2264    peer_supports_retry: bool,
2265    local_control_rx: mpsc::UnboundedReceiver<DriverLocalControl>,
2266    handler: Arc<H>,
2267    shared: Arc<DriverShared>,
2268    /// In-flight server-side handlers, keyed by request ID. Holds the
2269    /// `AbortHandle` for the corresponding entry in `handler_futs`. Used to
2270    /// abort handlers on cancel.
2271    in_flight_handlers: BTreeMap<RequestId, InFlightHandler>,
2272    /// Handler futures driven directly by the driver's `run` loop instead
2273    /// of being `tokio::spawn`'d. One alloc per request (the `Box<dyn
2274    /// Future>` plus the `Arc<Task>` inside `FuturesUnordered::push`),
2275    /// versus the `tokio::spawn` path which allocates a `Cell<T, S>` and
2276    /// memcpy's `Stage<Future, Output>` on every state transition.
2277    handler_futs: FuturesUnordered<HandlerFut>,
2278    /// Tracks live operations for dedup/attach/conflict within this session.
2279    /// Shared with DriverReplySink so seal can return waiters.
2280    live_operations: Arc<SyncMutex<LiveOperationTracker>>,
2281    local_control_tx: mpsc::UnboundedSender<DriverLocalControl>,
2282    drop_control_seed: Option<mpsc::UnboundedSender<DropControlRequest>>,
2283    drop_control_request: DropControlRequest,
2284    drop_guard: SyncMutex<Option<Weak<CallerDropGuard>>>,
2285}
2286
2287enum DriverLocalControl {
2288    CloseChannel {
2289        channel_id: ChannelId,
2290    },
2291    ResetChannel {
2292        channel_id: ChannelId,
2293    },
2294    GrantCredit {
2295        channel_id: ChannelId,
2296        additional: u32,
2297    },
2298}
2299
2300struct DriverChannelCreditReplenisher {
2301    connection_id: ConnectionId,
2302    channel_id: ChannelId,
2303    debug_context: Option<ChannelDebugContext>,
2304    shared: Weak<DriverShared>,
2305    threshold: u32,
2306    local_control_tx: mpsc::UnboundedSender<DriverLocalControl>,
2307    observer: Option<VoxObserverHandle>,
2308    pending: std::sync::Mutex<u32>,
2309}
2310
2311impl DriverChannelCreditReplenisher {
2312    fn new(
2313        connection_id: ConnectionId,
2314        channel_id: ChannelId,
2315        debug_context: Option<ChannelDebugContext>,
2316        shared: Weak<DriverShared>,
2317        initial_credit: u32,
2318        local_control_tx: mpsc::UnboundedSender<DriverLocalControl>,
2319        observer: Option<VoxObserverHandle>,
2320    ) -> Self {
2321        Self {
2322            connection_id,
2323            channel_id,
2324            debug_context,
2325            shared,
2326            threshold: (initial_credit / 2).max(1),
2327            local_control_tx,
2328            observer,
2329            pending: std::sync::Mutex::new(0),
2330        }
2331    }
2332}
2333
2334impl ChannelCreditReplenisher for DriverChannelCreditReplenisher {
2335    fn on_item_consumed(&self) {
2336        let mut pending = self.pending.lock().expect("pending credit mutex poisoned");
2337        *pending += 1;
2338        if let Some(shared) = self.shared.upgrade() {
2339            shared.record_item_consumed(self.channel_id);
2340            shared.record_pending_local_grant(self.channel_id, *pending);
2341        }
2342        if *pending < self.threshold {
2343            return;
2344        }
2345
2346        let additional = *pending;
2347        *pending = 0;
2348        if let Some(shared) = self.shared.upgrade() {
2349            shared.record_pending_local_grant(self.channel_id, additional);
2350        }
2351        let _ = self.local_control_tx.send(DriverLocalControl::GrantCredit {
2352            channel_id: self.channel_id,
2353            additional,
2354        });
2355    }
2356
2357    fn on_receiver_dropped(&self) {
2358        if let Some(shared) = self.shared.upgrade() {
2359            shared.record_receiver_dropped(self.channel_id);
2360        }
2361        let _ = self
2362            .local_control_tx
2363            .send(DriverLocalControl::ResetChannel {
2364                channel_id: self.channel_id,
2365            });
2366    }
2367
2368    fn channel_id(&self) -> Option<ChannelId> {
2369        Some(self.channel_id)
2370    }
2371
2372    fn connection_id(&self) -> Option<ConnectionId> {
2373        Some(self.connection_id)
2374    }
2375
2376    fn debug_context(&self) -> Option<ChannelDebugContext> {
2377        self.debug_context
2378    }
2379
2380    fn observer(&self) -> Option<VoxObserverHandle> {
2381        self.observer.clone()
2382    }
2383}
2384
2385impl<H: Handler<DriverReplySink>> Driver<H> {
2386    // r[impl rpc.channel.connection-closure]
2387    fn close_all_channel_runtime_state(&self, teardown: ChannelRuntimeTeardown) {
2388        let mut credits = self.shared.channel_credits.lock();
2389        for semaphore in credits.values() {
2390            semaphore.close();
2391        }
2392        // Track all outbound channel IDs that are being cleared so we can suppress
2393        // ChannelClose messages triggered by aborted handler tasks dropping their Tx handles.
2394        let mut stale = self.shared.stale_close_channels.lock();
2395        stale.extend(credits.keys().copied());
2396        credits.clear();
2397        drop(credits);
2398
2399        let channel_senders = {
2400            let mut senders = self.shared.channel_senders.lock();
2401            std::mem::take(&mut *senders)
2402        };
2403        if let ChannelRuntimeTeardown::ConnectionClosed(reason) = teardown {
2404            for (channel_id, sender) in channel_senders {
2405                let _ = sender.force_send(IncomingChannelMessage::ConnectionClosed(reason));
2406                self.shared
2407                    .observe_channel(channel_id, None, |channel| ChannelEvent::Closed {
2408                        channel,
2409                        reason: ChannelCloseReason::ConnectionClosed,
2410                    });
2411            }
2412        }
2413        self.shared.channel_receivers.lock().clear();
2414        self.shared.terminal_channels.lock().clear();
2415    }
2416
2417    fn close_outbound_channel(&self, channel_id: ChannelId) {
2418        self.shared.terminal_channels.lock().insert(channel_id);
2419        if let Some(semaphore) = self.shared.channel_credits.lock().remove(&channel_id) {
2420            semaphore.close();
2421        }
2422    }
2423
2424    fn abort_channel_handlers(&mut self) {
2425        for in_flight in self.in_flight_handlers.values() {
2426            if self.handler.args_have_channels(in_flight.method_id) {
2427                if let Some(operation_id) = in_flight.operation_id {
2428                    self.shared.operations.remove(operation_id);
2429                    self.live_operations.lock().release(operation_id);
2430                }
2431                in_flight.abort.abort();
2432            }
2433        }
2434    }
2435
2436    pub fn new(handle: ConnectionHandle, handler: H) -> Self {
2437        Self::with_operation_store(handle, handler, Arc::new(InMemoryOperationStore::default()))
2438    }
2439
2440    pub fn with_operation_store(
2441        handle: ConnectionHandle,
2442        handler: H,
2443        operation_store: Arc<dyn OperationStore>,
2444    ) -> Self {
2445        let conn_id = handle.connection_id();
2446        let ConnectionHandle {
2447            sender,
2448            rx,
2449            failures_rx,
2450            control_tx,
2451            closed_rx,
2452            resumed_rx,
2453            local_settings,
2454            peer_settings,
2455            parity,
2456            peer_supports_retry,
2457            observer,
2458        } = handle;
2459        let drop_control_request = DropControlRequest::Close(conn_id);
2460        let (local_control_tx, local_control_rx) = mpsc::unbounded_channel("driver.local_control");
2461        let (resume_processed_tx, _resume_processed_rx) = watch::channel(0_u64);
2462        Self {
2463            sender,
2464            rx,
2465            failures_rx,
2466            closed_rx,
2467            resumed_rx,
2468            resume_processed_tx,
2469            peer_supports_retry,
2470            local_control_rx,
2471            handler: Arc::new(handler),
2472            shared: Arc::new(DriverShared {
2473                connection_id: conn_id,
2474                pending_responses: SyncMutex::new("driver.pending_responses", BTreeMap::new()),
2475                request_ids: SyncMutex::new("driver.request_ids", IdAllocator::new(parity)),
2476                next_operation_id: AtomicU64::new(1),
2477                operations: operation_store,
2478                channel_ids: SyncMutex::new("driver.channel_ids", IdAllocator::new(parity)),
2479                channel_senders: SyncMutex::new("driver.channel_senders", BTreeMap::new()),
2480                channel_receivers: SyncMutex::new("driver.channel_receivers", BTreeMap::new()),
2481                channel_credits: SyncMutex::new("driver.channel_credits", BTreeMap::new()),
2482                channel_contexts: SyncMutex::new("driver.channel_contexts", BTreeMap::new()),
2483                request_debug: SyncMutex::new("driver.request_debug", BTreeMap::new()),
2484                channel_debug: SyncMutex::new("driver.channel_debug", BTreeMap::new()),
2485                last_inbound_message_at: SyncMutex::new("driver.last_inbound_message_at", None),
2486                last_outbound_message_at: SyncMutex::new("driver.last_outbound_message_at", None),
2487                close_reason: SyncMutex::new("driver.close_reason", None),
2488                terminal_channels: SyncMutex::new("driver.terminal_channels", HashSet::new()),
2489                stale_close_channels: SyncMutex::new(
2490                    "driver.stale_close_channels",
2491                    std::collections::HashSet::new(),
2492                ),
2493                local_initial_channel_credit: local_settings.initial_channel_credit,
2494                peer_initial_channel_credit: peer_settings.initial_channel_credit,
2495                observer,
2496            }),
2497            in_flight_handlers: BTreeMap::new(),
2498            handler_futs: FuturesUnordered::new(),
2499            live_operations: Arc::new(SyncMutex::new(
2500                "driver.live_operations",
2501                LiveOperationTracker::new(),
2502            )),
2503            local_control_tx,
2504            drop_control_seed: control_tx,
2505            drop_control_request,
2506            drop_guard: SyncMutex::new("driver.drop_guard", None),
2507        }
2508    }
2509
2510    /// Get a cloneable caller handle for making outgoing calls.
2511    // r[impl rpc.caller.liveness.refcounted]
2512    // r[impl rpc.caller.liveness.last-drop-closes-connection]
2513    // r[impl rpc.caller.liveness.root-internal-close]
2514    // r[impl rpc.caller.liveness.root-teardown-condition]
2515    fn existing_drop_guard(&self) -> Option<Arc<CallerDropGuard>> {
2516        self.drop_guard.lock().as_ref().and_then(Weak::upgrade)
2517    }
2518
2519    fn connection_drop_guard(&self) -> Option<Arc<CallerDropGuard>> {
2520        if let Some(existing) = self.existing_drop_guard() {
2521            Some(existing)
2522        } else if let Some(seed) = &self.drop_control_seed {
2523            let mut guard = self.drop_guard.lock();
2524            if let Some(existing) = guard.as_ref().and_then(Weak::upgrade) {
2525                Some(existing)
2526            } else {
2527                let arc = Arc::new(CallerDropGuard {
2528                    control_tx: seed.clone(),
2529                    request: self.drop_control_request,
2530                });
2531                *guard = Some(Arc::downgrade(&arc));
2532                Some(arc)
2533            }
2534        } else {
2535            None
2536        }
2537    }
2538
2539    pub fn caller(&self) -> DriverCaller {
2540        let drop_guard = self.connection_drop_guard();
2541        DriverCaller {
2542            sender: self.sender.clone(),
2543            shared: Arc::clone(&self.shared),
2544            local_control_tx: self.local_control_tx.clone(),
2545            closed_rx: self.closed_rx.clone(),
2546            resumed_rx: self.resumed_rx.clone(),
2547            resume_processed_rx: self.resume_processed_tx.subscribe(),
2548            peer_supports_retry: self.peer_supports_retry,
2549            _drop_guard: drop_guard,
2550        }
2551    }
2552
2553    // r[impl rpc.debug.snapshot]
2554    pub fn debug_snapshot(&self) -> VoxDebugSnapshot {
2555        self.shared.debug_snapshot(
2556            &self.sender,
2557            self.shared
2558                .connection_debug_state(self.closed_rx.borrow().is_some()),
2559            DriverTaskStatus::Alive,
2560        )
2561    }
2562
2563    pub fn dump_debug_snapshot(&self) -> VoxDebugSnapshot {
2564        let snapshot = self.debug_snapshot();
2565        tracing::info!(?snapshot, "vox debug snapshot");
2566        snapshot
2567    }
2568
2569    fn internal_binder(&self) -> DriverChannelBinder {
2570        DriverChannelBinder {
2571            sender: self.sender.clone(),
2572            shared: Arc::clone(&self.shared),
2573            local_control_tx: self.local_control_tx.clone(),
2574            drop_guard: self.existing_drop_guard(),
2575        }
2576    }
2577
2578    // r[impl rpc.pipelining]
2579    /// Main loop: receive messages from the session and dispatch them.
2580    /// Handler calls run as spawned tasks — we don't block the driver
2581    /// loop waiting for a handler to finish.
2582    pub async fn run(&mut self) {
2583        let mut resumed_rx = self.resumed_rx.clone();
2584        let mut seen_resume_generation = *resumed_rx.borrow();
2585        loop {
2586            tracing::trace!("driver select loop top");
2587            tokio::select! {
2588                biased;
2589                changed = resumed_rx.changed() => {
2590                    if changed.is_err() {
2591                        tracing::trace!(
2592                            conn_id = self.sender.connection_id().0,
2593                            "resume notifier closed, exiting driver"
2594                        );
2595                        break;
2596                    }
2597                    let generation = *resumed_rx.borrow();
2598                    if generation != seen_resume_generation {
2599                        seen_resume_generation = generation;
2600                        self.close_all_channel_runtime_state(ChannelRuntimeTeardown::DropOnly);
2601                        self.abort_channel_handlers();
2602                        let _ = self.resume_processed_tx.send(generation);
2603                    }
2604                }
2605                Some(ctrl) = self.local_control_rx.recv() => {
2606                    self.handle_local_control(ctrl).await;
2607                }
2608                Some((req_id, disposition)) = self.failures_rx.recv() => {
2609                    tracing::trace!(%req_id, ?disposition, "failures_rx fired");
2610                    let in_flight_found = self.in_flight_handlers.contains_key(&req_id);
2611                    let in_flight_method_id =
2612                        self.in_flight_handlers.get(&req_id).map(|in_flight| in_flight.method_id);
2613                    let reply_disposition = self
2614                        .in_flight_handlers
2615                        .get(&req_id)
2616                        .map(|in_flight| {
2617                            let has_channels =
2618                                self.handler.args_have_channels(in_flight.method_id);
2619                            if has_channels && !in_flight.retry.idem {
2620                                Some(FailureDisposition::Indeterminate)
2621                            } else if has_channels && in_flight.retry.idem {
2622                                None
2623                            } else {
2624                                Some(disposition)
2625                            }
2626                        })
2627                        .unwrap_or(Some(disposition));
2628                    tracing::trace!(%req_id, in_flight_found, ?reply_disposition, "failures_rx computed disposition");
2629                    // Clean up the handler tracking entry.
2630                    self.in_flight_handlers.remove(&req_id);
2631                    self.shared.finish_request(req_id, RequestDebugState::Failed);
2632                    tracing::trace!(%req_id, in_flight = self.in_flight_handlers.len(), "handler removed on failure");
2633                    let had_pending = self.shared.pending_responses.lock().remove(&req_id).is_some();
2634                    tracing::trace!(%req_id, had_pending, "failures_rx checked pending_responses");
2635                    if !had_pending {
2636                        let Some(reply_disposition) = reply_disposition else {
2637                            tracing::trace!(%req_id, "failures_rx: no reply_disposition, skipping");
2638                            continue;
2639                        };
2640                        tracing::trace!(%req_id, ?reply_disposition, "failures_rx: sending error response");
2641                        let vox_error = match reply_disposition {
2642                            FailureDisposition::Cancelled => VoxError::Cancelled,
2643                            FailureDisposition::Indeterminate => VoxError::Indeterminate,
2644                        };
2645                        if let Some(method_id) = in_flight_method_id
2646                            && let Some(response_shape) = self.handler.response_wire_shape(method_id)
2647                            && let Ok(extracted) = vox_types::extract_schemas(response_shape)
2648                        {
2649                            let registry = vox_types::build_registry(&extracted.schemas);
2650                            let error: Result<(), VoxError<core::convert::Infallible>> =
2651                                Err(vox_error);
2652                            let encoded = vox_postcard::to_vec(&error)
2653                                .expect("serialize runtime-generated error response");
2654                            let mut response = RequestResponse {
2655                                ret: Payload::PostcardBytes(Box::leak(encoded.into_boxed_slice())),
2656                                metadata: Default::default(),
2657                                schemas: Default::default(),
2658                            };
2659                            self.sender.prepare_response_from_source(
2660                                req_id,
2661                                method_id,
2662                                &extracted.root,
2663                                &registry,
2664                                &mut response,
2665                            );
2666                            let _ = self.sender.send_response(req_id, response).await;
2667                        } else {
2668                            let error: Result<(), VoxError<core::convert::Infallible>> =
2669                                Err(vox_error);
2670                            let _ = self.sender.send_response(req_id, RequestResponse {
2671                                ret: Payload::outgoing(&error),
2672                                metadata: Default::default(),
2673                                schemas: Default::default(),
2674                            }).await;
2675                        }
2676                        tracing::trace!(%req_id, "failures_rx: error response sent");
2677                    }
2678                }
2679                recv = self.rx.recv() => {
2680                    match recv {
2681                        Some(recv) => {
2682                            self.handle_recv(recv).await;
2683                        }
2684                        None => {
2685                            tracing::trace!("driver rx closed, exiting loop");
2686                            break;
2687                        }
2688                    }
2689                }
2690                // The handler-future arm only fires when at least one
2691                // handler is in flight. The guard is essential:
2692                // `FuturesUnordered::next` on an empty stream returns
2693                // `Poll::Ready(None)` immediately, which would spin the
2694                // select loop.
2695                Some(item) = self.handler_futs.next(), if !self.handler_futs.is_empty() => {
2696                    match item {
2697                        Ok(HandlerCompletion::Finished(req_id)) => {
2698                            let removed = self.in_flight_handlers.remove(&req_id).is_some();
2699                            self.shared.finish_request(req_id, RequestDebugState::Finished);
2700                            tracing::trace!(
2701                                %req_id,
2702                                removed,
2703                                in_flight = self.in_flight_handlers.len(),
2704                                "handler completion processed",
2705                            );
2706                        }
2707                        Ok(HandlerCompletion::Panicked { request_id, method_id }) => {
2708                            tracing::error!(
2709                                req_id = ?request_id,
2710                                ?method_id,
2711                                "vox driver handler panicked; waiting for reply-sink failure path"
2712                            );
2713                        }
2714                        Err(_aborted) => {
2715                            // Cancel/abort paths already removed the entry
2716                            // before flipping the AbortHandle. Nothing to do.
2717                        }
2718                    }
2719                }
2720            }
2721        }
2722
2723        for (_, in_flight) in std::mem::take(&mut self.in_flight_handlers) {
2724            if !in_flight.retry.persist {
2725                in_flight.abort.abort();
2726            }
2727        }
2728        self.shared.pending_responses.lock().clear();
2729        self.shared.request_debug.lock().clear();
2730        let close_reason =
2731            (*self.closed_rx.borrow()).unwrap_or(ConnectionCloseReason::SessionShutdown);
2732        self.shared.set_connection_closed(close_reason);
2733
2734        // Connection is gone: drop channel runtime state so any registered Rx
2735        // receivers observe closure instead of hanging on recv(), and wake any
2736        // outbound Tx handles waiting for grant-credit.
2737        self.close_all_channel_runtime_state(ChannelRuntimeTeardown::ConnectionClosed(
2738            close_reason,
2739        ));
2740    }
2741
2742    async fn handle_local_control(&mut self, control: DriverLocalControl) {
2743        match control {
2744            DriverLocalControl::CloseChannel { channel_id } => {
2745                // Don't send Close for channels that were cleared during session resume.
2746                // When handler tasks are aborted, their dropped Tx handles trigger
2747                // close_channel_on_drop, but we should not send Close to the peer
2748                // for channels the peer has also cleared.
2749                if self.shared.stale_close_channels.lock().remove(&channel_id) {
2750                    tracing::trace!(%channel_id, "suppressing ChannelClose for stale channel");
2751                    return;
2752                }
2753                self.close_outbound_channel(channel_id);
2754                self.shared
2755                    .observe_channel(channel_id, None, |channel| ChannelEvent::Closed {
2756                        channel,
2757                        reason: ChannelCloseReason::Local,
2758                    });
2759                self.shared.mark_outbound_progress();
2760                let _ = self
2761                    .sender
2762                    .send(ConnectionMessage::Channel(ChannelMessage {
2763                        id: channel_id,
2764                        body: ChannelBody::Close(ChannelClose {
2765                            metadata: Default::default(),
2766                        }),
2767                    }))
2768                    .await;
2769            }
2770            DriverLocalControl::ResetChannel { channel_id } => {
2771                self.shared.channel_senders.lock().remove(&channel_id);
2772                self.shared.channel_receivers.lock().remove(&channel_id);
2773                self.close_outbound_channel(channel_id);
2774                self.shared
2775                    .observe_channel(channel_id, None, |channel| ChannelEvent::Reset {
2776                        channel,
2777                        reason: ChannelResetReason::Local,
2778                    });
2779                self.shared.mark_outbound_progress();
2780                let _ = self
2781                    .sender
2782                    .send(ConnectionMessage::Channel(ChannelMessage {
2783                        id: channel_id,
2784                        body: ChannelBody::Reset(vox_types::ChannelReset {
2785                            metadata: Default::default(),
2786                        }),
2787                    }))
2788                    .await;
2789            }
2790            DriverLocalControl::GrantCredit {
2791                channel_id,
2792                additional,
2793            } => {
2794                self.shared.observe_channel(channel_id, None, |channel| {
2795                    ChannelEvent::CreditGranted {
2796                        channel,
2797                        amount: additional,
2798                    }
2799                });
2800                self.shared.mark_outbound_progress();
2801                let _ = self
2802                    .sender
2803                    .send(ConnectionMessage::Channel(ChannelMessage {
2804                        id: channel_id,
2805                        body: ChannelBody::GrantCredit(vox_types::ChannelGrantCredit {
2806                            additional,
2807                        }),
2808                    }))
2809                    .await;
2810            }
2811        }
2812    }
2813
2814    async fn handle_recv(&mut self, recv: crate::session::RecvMessage) {
2815        self.shared.mark_inbound_progress();
2816        let crate::session::RecvMessage { schemas, msg, fds } = recv;
2817        let msg_ref = msg.get();
2818        let is_request = matches!(msg_ref, ConnectionMessage::Request(_));
2819        if is_request {
2820            if let ConnectionMessage::Request(req) = msg_ref {
2821                vox_types::dlog!(
2822                    "[driver] handle_recv request: conn={:?} req={:?} body={} method={:?}",
2823                    self.sender.connection_id(),
2824                    req.id,
2825                    match &req.body {
2826                        RequestBody::Call(_) => "Call",
2827                        RequestBody::Response(_) => "Response",
2828                        RequestBody::Cancel(_) => "Cancel",
2829                    },
2830                    match &req.body {
2831                        RequestBody::Call(call) => Some(call.method_id),
2832                        RequestBody::Response(_) | RequestBody::Cancel(_) => None,
2833                    }
2834                );
2835                match &req.body {
2836                    RequestBody::Call(call) => tracing::trace!(
2837                        conn_id = self.sender.connection_id().0,
2838                        req_id = req.id.0,
2839                        method_id = call.method_id.0,
2840                        "driver received call"
2841                    ),
2842                    RequestBody::Response(_) => tracing::trace!(
2843                        conn_id = self.sender.connection_id().0,
2844                        req_id = req.id.0,
2845                        "driver received response message"
2846                    ),
2847                    RequestBody::Cancel(_) => tracing::trace!(
2848                        conn_id = self.sender.connection_id().0,
2849                        req_id = req.id.0,
2850                        "driver received cancel message"
2851                    ),
2852                }
2853            }
2854            let msg = msg.map(|m| match m {
2855                ConnectionMessage::Request(r) => r,
2856                _ => unreachable!(),
2857            });
2858            self.handle_request(msg, schemas, fds);
2859        } else {
2860            let msg = msg.map(|m| match m {
2861                ConnectionMessage::Channel(c) => c,
2862                _ => unreachable!(),
2863            });
2864            self.handle_channel(msg).await;
2865        }
2866    }
2867
2868    fn handle_request(
2869        &mut self,
2870        msg: SelfRef<RequestMessage<'static>>,
2871        schemas: Arc<vox_types::SchemaRecvTracker>,
2872        fds: vox_types::FrameFds,
2873    ) {
2874        let msg_ref = msg.get();
2875        let req_id = msg_ref.id;
2876        let is_call = matches!(&msg_ref.body, RequestBody::Call(_));
2877        let is_response = matches!(&msg_ref.body, RequestBody::Response(_));
2878        let is_cancel = matches!(&msg_ref.body, RequestBody::Cancel(_));
2879
2880        if is_call {
2881            let method_id = match &msg_ref.body {
2882                RequestBody::Call(call) => call.method_id,
2883                _ => unreachable!(),
2884            };
2885            vox_types::dlog!(
2886                "[driver] inbound call: conn={:?} req={:?} method={:?}",
2887                self.sender.connection_id(),
2888                req_id,
2889                method_id
2890            );
2891            // r[impl rpc.request]
2892            // r[impl rpc.error.scope]
2893            let call = msg.map(|m| match m.body {
2894                RequestBody::Call(c) => c,
2895                _ => unreachable!(),
2896            });
2897            let call_ref = call.get();
2898            let handler = Arc::clone(&self.handler);
2899            let retry = handler.retry_policy(call_ref.method_id);
2900            // Idempotent requests can be re-executed safely; skip operation tracking/storage.
2901            let operation_id = metadata_operation_id(&call_ref.metadata).filter(|_| !retry.idem);
2902            let method_id = call_ref.method_id;
2903
2904            if let Some(operation_id) = operation_id {
2905                // 1. Check live tracker (in-flight operations in this session)
2906                let admit = self.live_operations.lock().admit(
2907                    operation_id,
2908                    call_ref.method_id,
2909                    incoming_args_bytes(call_ref),
2910                    retry,
2911                    req_id,
2912                );
2913                match admit {
2914                    AdmitResult::Attached => return,
2915                    AdmitResult::Conflict => {
2916                        let sender = self.sender.clone();
2917                        moire::task::spawn(
2918                            async move {
2919                                let error: Result<(), VoxError<core::convert::Infallible>> =
2920                                    Err(VoxError::InvalidPayload("operation ID conflict".into()));
2921                                let _ = sender
2922                                    .send_response(
2923                                        req_id,
2924                                        RequestResponse {
2925                                            ret: Payload::outgoing(&error),
2926                                            metadata: Default::default(),
2927                                            schemas: Default::default(),
2928                                        },
2929                                    )
2930                                    .await;
2931                            }
2932                            .named("operation_reject"),
2933                        );
2934                        return;
2935                    }
2936                    AdmitResult::Start => {}
2937                }
2938
2939                // 2. Check persistent store (sealed/admitted from previous sessions)
2940                match self.shared.operations.lookup(operation_id) {
2941                    crate::OperationState::Sealed => {
2942                        // Replay the sealed response.
2943                        if let Some(sealed) = self.shared.operations.get_sealed(operation_id) {
2944                            let sender = self.sender.clone();
2945                            let method_id = call_ref.method_id;
2946                            let response_shape = self.handler.response_wire_shape(method_id);
2947                            // Remove from live tracker — we're replaying, not running a handler.
2948                            self.live_operations.lock().seal(operation_id);
2949                            moire::task::spawn(
2950                                async move {
2951                                    if replay_sealed_response(
2952                                        sender.clone(),
2953                                        req_id,
2954                                        method_id,
2955                                        sealed.response.as_bytes(),
2956                                        response_shape,
2957                                    )
2958                                    .await
2959                                    .is_err()
2960                                    {
2961                                        sender.mark_failure(req_id, FailureDisposition::Cancelled);
2962                                    }
2963                                }
2964                                .named("operation_replay"),
2965                            );
2966                            return;
2967                        }
2968                    }
2969                    crate::OperationState::Admitted => {
2970                        // Previously admitted but never sealed — indeterminate.
2971                        self.live_operations.lock().seal(operation_id);
2972                        let sender = self.sender.clone();
2973                        moire::task::spawn(
2974                            async move {
2975                                let error: Result<(), VoxError<core::convert::Infallible>> =
2976                                    Err(VoxError::Indeterminate);
2977                                let _ = sender
2978                                    .send_response(
2979                                        req_id,
2980                                        RequestResponse {
2981                                            ret: Payload::outgoing(&error),
2982                                            metadata: Default::default(),
2983                                            schemas: Default::default(),
2984                                        },
2985                                    )
2986                                    .await;
2987                            }
2988                            .named("operation_indeterminate"),
2989                        );
2990                        return;
2991                    }
2992                    crate::OperationState::Unknown => {
2993                        // New operation — admit in the persistent store if non-idem.
2994                        // Idem operations can safely be re-executed, no need to track.
2995                        if !retry.idem {
2996                            self.shared.operations.admit(operation_id);
2997                        }
2998                    }
2999                }
3000            }
3001            let reply = DriverReplySink {
3002                sender: Some(self.sender.clone()),
3003                request_id: req_id,
3004                method_id: call_ref.method_id,
3005                retry,
3006                operation_id,
3007                operations: operation_id.map(|_| Arc::clone(&self.shared.operations)),
3008                live_operations: operation_id.map(|_| Arc::clone(&self.live_operations)),
3009                binder: self.internal_binder(),
3010                handler_response_shape: handler.response_wire_shape(call_ref.method_id),
3011            };
3012            self.shared.start_request(
3013                req_id,
3014                method_id,
3015                None,
3016                None,
3017                RequestDebugState::Dispatching,
3018            );
3019            let (abort, abort_reg) = AbortHandle::new_pair();
3020            let handler_fut: Pin<Box<dyn MaybeSendFuture<Output = HandlerCompletion> + 'static>> =
3021                Box::pin(async move {
3022                    tracing::debug!(
3023                        req_id = ?req_id,
3024                        method_id = ?method_id,
3025                        "vox driver handler starting"
3026                    );
3027                    vox_types::dlog!(
3028                        "[driver] handler start: req={:?} method={:?}",
3029                        req_id,
3030                        method_id
3031                    );
3032                    let result = AssertUnwindSafe(handler.handle(call, reply, schemas))
3033                        .catch_unwind()
3034                        .await;
3035                    if result.is_err() {
3036                        return HandlerCompletion::Panicked {
3037                            request_id: req_id,
3038                            method_id,
3039                        };
3040                    }
3041                    tracing::debug!(
3042                        req_id = ?req_id,
3043                        method_id = ?method_id,
3044                        "vox driver handler finished"
3045                    );
3046                    vox_types::dlog!(
3047                        "[driver] handler done: req={:?} method={:?}",
3048                        req_id,
3049                        method_id
3050                    );
3051                    HandlerCompletion::Finished(req_id)
3052                });
3053            self.handler_futs
3054                .push(Abortable::new(handler_fut, abort_reg));
3055            self.in_flight_handlers.insert(
3056                req_id,
3057                InFlightHandler {
3058                    abort,
3059                    method_id,
3060                    retry,
3061                    operation_id,
3062                },
3063            );
3064            tracing::trace!(%req_id, in_flight = self.in_flight_handlers.len(), "handler inserted");
3065        } else if is_response {
3066            // r[impl rpc.response.one-per-request]
3067            vox_types::dlog!(
3068                "[driver] inbound response: conn={:?} req={:?}",
3069                self.sender.connection_id(),
3070                req_id
3071            );
3072            tracing::trace!(%req_id, "driver received response");
3073            if let Some(tx) = self.shared.pending_responses.lock().remove(&req_id) {
3074                vox_types::dlog!("[driver] routing response to waiter: req={:?}", req_id);
3075                tracing::trace!(%req_id, "routing response to pending oneshot");
3076                let _: Result<(), _> = tx.send(PendingResponse { msg, schemas, fds });
3077            } else {
3078                vox_types::dlog!("[driver] dropped unmatched response: req={:?}", req_id);
3079                tracing::trace!(%req_id, "no pending response slot for this req_id");
3080            }
3081        } else if is_cancel {
3082            vox_types::dlog!(
3083                "[driver] inbound cancel: conn={:?} req={:?}",
3084                self.sender.connection_id(),
3085                req_id
3086            );
3087            // r[impl rpc.cancel]
3088            // r[impl rpc.cancel.channels]
3089            tracing::trace!(%req_id, in_flight = self.in_flight_handlers.contains_key(&req_id), "received cancel");
3090            match self.live_operations.lock().cancel(req_id) {
3091                CancelResult::NotFound => {
3092                    let should_abort = self
3093                        .in_flight_handlers
3094                        .get(&req_id)
3095                        .map(|in_flight| !in_flight.retry.persist)
3096                        .unwrap_or(false);
3097                    tracing::trace!(%req_id, should_abort, "cancel: not in live operations");
3098                    if should_abort && let Some(in_flight) = self.in_flight_handlers.remove(&req_id)
3099                    {
3100                        tracing::trace!(%req_id, "aborting handler");
3101                        in_flight.abort.abort();
3102                        self.shared
3103                            .finish_request(req_id, RequestDebugState::Failed);
3104                        tracing::trace!(%req_id, in_flight = self.in_flight_handlers.len(), "handler removed on cancel");
3105                    }
3106                }
3107                CancelResult::Detached => {}
3108                CancelResult::Abort {
3109                    owner_request_id,
3110                    waiters,
3111                } => {
3112                    if let Some(in_flight) = self.in_flight_handlers.remove(&owner_request_id) {
3113                        if let Some(op_id) = in_flight.operation_id {
3114                            self.shared.operations.remove(op_id);
3115                        }
3116                        in_flight.abort.abort();
3117                        self.shared
3118                            .finish_request(owner_request_id, RequestDebugState::Failed);
3119                        tracing::trace!(%owner_request_id, in_flight = self.in_flight_handlers.len(), "owner handler removed on abort");
3120                    }
3121                    for waiter in waiters {
3122                        self.sender
3123                            .mark_failure(waiter, FailureDisposition::Cancelled);
3124                    }
3125                }
3126            }
3127            // The response is sent automatically: aborting drops DriverReplySink →
3128            // mark_failure fires → failures_rx arm sends VoxError::Cancelled.
3129        }
3130    }
3131
3132    async fn handle_channel(&mut self, msg: SelfRef<ChannelMessage<'static>>) {
3133        let msg_ref = msg.get();
3134        let chan_id = msg_ref.id;
3135        enum ChannelBodyKind {
3136            Item,
3137            Close,
3138            Reset,
3139            GrantCredit(u32),
3140        }
3141        let body_kind = match &msg_ref.body {
3142            ChannelBody::Item(_) => ChannelBodyKind::Item,
3143            ChannelBody::Close(_) => ChannelBodyKind::Close,
3144            ChannelBody::Reset(_) => ChannelBodyKind::Reset,
3145            ChannelBody::GrantCredit(grant) => ChannelBodyKind::GrantCredit(grant.additional),
3146        };
3147
3148        match body_kind {
3149            // r[impl rpc.channel.item]
3150            // r[impl rpc.channel.delivery.reliable]
3151            ChannelBodyKind::Item => {
3152                if self.shared.terminal_channels.lock().contains(&chan_id) {
3153                    self.shared.record_inbound_item_not_enqueued(chan_id);
3154                    tracing::trace!(
3155                        conn_id = self.sender.connection_id().0,
3156                        channel_id = chan_id.0,
3157                        "driver dropped item for terminal channel"
3158                    );
3159                    return;
3160                }
3161
3162                tracing::trace!(
3163                    conn_id = self.sender.connection_id().0,
3164                    channel_id = chan_id.0,
3165                    "driver received channel item"
3166                );
3167                let item = msg.map(|m| match m.body {
3168                    ChannelBody::Item(item) => item,
3169                    _ => unreachable!(),
3170                });
3171                let sender = self.shared.inbound_channel_sender(chan_id);
3172                if sender
3173                    .send(IncomingChannelMessage::Item(item))
3174                    .await
3175                    .is_err()
3176                {
3177                    self.shared.record_inbound_item_not_enqueued(chan_id);
3178                    self.shared.channel_senders.lock().remove(&chan_id);
3179                    self.shared.channel_receivers.lock().remove(&chan_id);
3180                    self.close_outbound_channel(chan_id);
3181                    let _ = self
3182                        .local_control_tx
3183                        .send(DriverLocalControl::ResetChannel {
3184                            channel_id: chan_id,
3185                        });
3186                    return;
3187                }
3188                self.shared
3189                    .observe_channel(chan_id, None, |channel| ChannelEvent::ItemReceived {
3190                        channel,
3191                    });
3192            }
3193            // r[impl rpc.channel.close]
3194            ChannelBodyKind::Close => {
3195                if self.shared.terminal_channels.lock().contains(&chan_id) {
3196                    return;
3197                }
3198                let sender = self.shared.inbound_channel_sender(chan_id);
3199                tracing::trace!(
3200                    conn_id = self.sender.connection_id().0,
3201                    channel_id = chan_id.0,
3202                    "driver received channel close"
3203                );
3204                let close = msg.map(|m| match m.body {
3205                    ChannelBody::Close(close) => close,
3206                    _ => unreachable!(),
3207                });
3208                let delivered = sender
3209                    .send(IncomingChannelMessage::Close(close))
3210                    .await
3211                    .is_ok();
3212                self.shared.channel_senders.lock().remove(&chan_id);
3213                self.shared.terminal_channels.lock().insert(chan_id);
3214                self.close_outbound_channel(chan_id);
3215                if !delivered {
3216                    self.shared.channel_receivers.lock().remove(&chan_id);
3217                    return;
3218                }
3219                self.shared
3220                    .observe_channel(chan_id, None, |channel| ChannelEvent::Closed {
3221                        channel,
3222                        reason: ChannelCloseReason::Remote,
3223                    });
3224            }
3225            // r[impl rpc.channel.reset]
3226            ChannelBodyKind::Reset => {
3227                if self.shared.terminal_channels.lock().contains(&chan_id) {
3228                    return;
3229                }
3230                let sender = self.shared.inbound_channel_sender(chan_id);
3231                tracing::trace!(
3232                    conn_id = self.sender.connection_id().0,
3233                    channel_id = chan_id.0,
3234                    "driver received channel reset"
3235                );
3236                let reset = msg.map(|m| match m.body {
3237                    ChannelBody::Reset(reset) => reset,
3238                    _ => unreachable!(),
3239                });
3240                let delivered = sender
3241                    .send(IncomingChannelMessage::Reset(reset))
3242                    .await
3243                    .is_ok();
3244                self.shared.channel_senders.lock().remove(&chan_id);
3245                self.shared.terminal_channels.lock().insert(chan_id);
3246                self.close_outbound_channel(chan_id);
3247                if !delivered {
3248                    self.shared.channel_receivers.lock().remove(&chan_id);
3249                    return;
3250                }
3251                self.shared
3252                    .observe_channel(chan_id, None, |channel| ChannelEvent::Reset {
3253                        channel,
3254                        reason: ChannelResetReason::Remote,
3255                    });
3256            }
3257            // r[impl rpc.flow-control.credit.grant]
3258            // r[impl rpc.flow-control.credit.grant.additive]
3259            ChannelBodyKind::GrantCredit(additional) => {
3260                self.shared.record_credit_received(chan_id, additional);
3261                self.shared.emit_channel_event(chan_id, None, |channel| {
3262                    ChannelEvent::CreditGranted {
3263                        channel,
3264                        amount: additional,
3265                    }
3266                });
3267                tracing::trace!(
3268                    conn_id = self.sender.connection_id().0,
3269                    channel_id = chan_id.0,
3270                    additional,
3271                    "driver received channel credit"
3272                );
3273                if let Some(semaphore) = self.shared.channel_credits.lock().get(&chan_id) {
3274                    semaphore.add_permits(additional as usize);
3275                }
3276            }
3277        }
3278    }
3279}