1use std::{
2 collections::{BTreeMap, HashMap, HashSet},
3 panic::AssertUnwindSafe,
4 pin::Pin,
5 sync::{
6 Arc, Weak,
7 atomic::{AtomicU64, Ordering},
8 },
9};
10
11use vox_types::time::Instant;
12
13use futures_util::future::{AbortHandle, Abortable, FutureExt as FuturesFutureExt};
14use futures_util::stream::{FuturesUnordered, StreamExt as _};
15use moire::sync::{Semaphore, SyncMutex};
16use tokio::sync::watch;
17
18use moire::task::FutureExt as _;
19use vox_types::{
20 BoxFut, CallResult, ChannelBinder, ChannelBody, ChannelClose, ChannelCreditReplenisher,
21 ChannelCreditReplenisherHandle, ChannelEventContext, ChannelId, ChannelItem,
22 ChannelLivenessHandle, ChannelMailboxReceiver, ChannelMailboxSender, ChannelMessage,
23 ChannelRetryMode, ChannelSink, ConnectionId, CreditSink, Handler, IdAllocator,
24 IncomingChannelMessage, MaybeSend, MaybeSendFuture, MaybeSync, Payload, ReplySink, RequestBody,
25 RequestCall, RequestId, RequestMessage, RequestResponse, SelfRef, TrySendError, TxError,
26 VoxError, channel_mailbox, ensure_operation_id, metadata_channel_retry_mode,
27 metadata_operation_id,
28};
29use vox_types::{
30 ChannelCloseReason, ChannelDebugContext, ChannelDirection, ChannelEvent, ChannelResetReason,
31 ChannelSendOutcome, ChannelTrySendOutcome, DriverEvent, RpcOutcome, VoxObserverHandle,
32};
33use vox_types::{
34 ChannelDebugSnapshot, ChannelReceiverState, ConnectionCloseReason, ConnectionDebugSnapshot,
35 ConnectionDebugState, DriverTaskStatus, RequestDebugSnapshot, RequestDebugState,
36 VoxDebugSnapshot,
37};
38
39use crate::session::{
40 ConnectionHandle, ConnectionMessage, ConnectionSender, DropControlRequest, FailureDisposition,
41};
42use crate::{InMemoryOperationStore, OperationStore};
43use moire::sync::mpsc;
44use vox_types::{OperationId, PostcardPayload};
45
46struct PendingResponse {
52 msg: SelfRef<RequestMessage<'static>>,
53 schemas: Arc<vox_types::SchemaRecvTracker>,
54 fds: vox_types::FrameFds,
58}
59
60type ResponseSlot = moire::sync::oneshot::Sender<PendingResponse>;
61
62struct InFlightHandler {
63 abort: AbortHandle,
68 method_id: vox_types::MethodId,
69 retry: vox_types::RetryPolicy,
70 operation_id: Option<OperationId>,
71}
72
73enum HandlerCompletion {
86 Finished(RequestId),
87 Panicked {
88 request_id: RequestId,
89 method_id: vox_types::MethodId,
90 },
91}
92
93type HandlerFut = Abortable<Pin<Box<dyn MaybeSendFuture<Output = HandlerCompletion> + 'static>>>;
94
95#[derive(Clone, Copy, Debug)]
96enum ChannelRuntimeTeardown {
97 DropOnly,
98 ConnectionClosed(ConnectionCloseReason),
99}
100
101struct LiveOperationTracker {
111 live: HashMap<OperationId, LiveOperation>,
113 request_to_operation: HashMap<RequestId, OperationId>,
115}
116
117struct LiveOperation {
118 method_id: vox_types::MethodId,
119 args_hash: u64,
120 owner_request_id: RequestId,
121 waiters: Vec<RequestId>,
122 retry: vox_types::RetryPolicy,
123}
124
125enum AdmitResult {
126 Start,
128 Attached,
130 Conflict,
132}
133
134impl LiveOperationTracker {
135 fn new() -> Self {
136 Self {
137 live: HashMap::new(),
138 request_to_operation: HashMap::new(),
139 }
140 }
141
142 fn admit(
143 &mut self,
144 operation_id: OperationId,
145 method_id: vox_types::MethodId,
146 args: &[u8],
147 retry: vox_types::RetryPolicy,
148 request_id: RequestId,
149 ) -> AdmitResult {
150 use std::hash::{Hash, Hasher};
151 let args_hash = {
152 let mut h = std::collections::hash_map::DefaultHasher::new();
153 method_id.hash(&mut h);
154 args.hash(&mut h);
155 h.finish()
156 };
157 let live_operations = self.live.len();
158
159 if let Some(live) = self.live.get_mut(&operation_id) {
160 if live.method_id != method_id || live.args_hash != args_hash {
161 let request_bindings = self.request_to_operation.len();
162 tracing::trace!(
163 %operation_id,
164 %request_id,
165 ?method_id,
166 live_operations,
167 request_bindings,
168 "live operation conflict"
169 );
170 return AdmitResult::Conflict;
171 }
172 live.waiters.push(request_id);
173 self.request_to_operation.insert(request_id, operation_id);
174 let waiters = live.waiters.len();
175 let request_bindings = self.request_to_operation.len();
176 tracing::trace!(
177 %operation_id,
178 %request_id,
179 ?method_id,
180 waiters,
181 live_operations,
182 request_bindings,
183 "live operation attached"
184 );
185 return AdmitResult::Attached;
186 }
187
188 self.live.insert(
189 operation_id,
190 LiveOperation {
191 method_id,
192 args_hash,
193 owner_request_id: request_id,
194 waiters: vec![request_id],
195 retry,
196 },
197 );
198 self.request_to_operation.insert(request_id, operation_id);
199 let live_operations = self.live.len();
200 let request_bindings = self.request_to_operation.len();
201 tracing::trace!(
202 %operation_id,
203 %request_id,
204 ?method_id,
205 live_operations,
206 request_bindings,
207 "live operation admitted"
208 );
209 AdmitResult::Start
210 }
211
212 fn seal(&mut self, operation_id: OperationId) -> Vec<RequestId> {
214 if let Some(live) = self.live.remove(&operation_id) {
215 for waiter in &live.waiters {
216 self.request_to_operation.remove(waiter);
217 }
218 let waiters = live.waiters.len();
219 let live_operations = self.live.len();
220 let request_bindings = self.request_to_operation.len();
221 tracing::trace!(
222 %operation_id,
223 waiters,
224 live_operations,
225 request_bindings,
226 "live operation sealed"
227 );
228 live.waiters
229 } else {
230 vec![]
231 }
232 }
233
234 fn release(&mut self, operation_id: OperationId) -> Option<LiveOperation> {
236 if let Some(live) = self.live.remove(&operation_id) {
237 for waiter in &live.waiters {
238 self.request_to_operation.remove(waiter);
239 }
240 let waiters = live.waiters.len();
241 let live_operations = self.live.len();
242 let request_bindings = self.request_to_operation.len();
243 tracing::trace!(
244 %operation_id,
245 waiters,
246 live_operations,
247 request_bindings,
248 "live operation released"
249 );
250 Some(live)
251 } else {
252 None
253 }
254 }
255
256 fn cancel(&mut self, request_id: RequestId) -> CancelResult {
258 let Some(&operation_id) = self.request_to_operation.get(&request_id) else {
259 return CancelResult::NotFound;
260 };
261 let live_operations = self.live.len();
262 let Some(live) = self.live.get_mut(&operation_id) else {
263 self.request_to_operation.remove(&request_id);
264 return CancelResult::NotFound;
265 };
266
267 if live.retry.persist {
268 if live.owner_request_id == request_id {
270 return CancelResult::NotFound; }
272 live.waiters.retain(|w| *w != request_id);
273 self.request_to_operation.remove(&request_id);
274 let waiters = live.waiters.len();
275 let request_bindings = self.request_to_operation.len();
276 tracing::trace!(
277 %operation_id,
278 %request_id,
279 waiters,
280 live_operations,
281 request_bindings,
282 "live operation detached waiter"
283 );
284 CancelResult::Detached
285 } else {
286 let live = self.live.remove(&operation_id).unwrap();
288 for waiter in &live.waiters {
289 self.request_to_operation.remove(waiter);
290 }
291 let waiters = live.waiters.len();
292 let live_operations = self.live.len();
293 let request_bindings = self.request_to_operation.len();
294 tracing::trace!(
295 %operation_id,
296 %request_id,
297 waiters,
298 live_operations,
299 request_bindings,
300 "live operation aborted"
301 );
302 CancelResult::Abort {
303 owner_request_id: live.owner_request_id,
304 waiters: live.waiters,
305 }
306 }
307 }
308}
309
310enum CancelResult {
311 NotFound,
312 Detached,
313 Abort {
314 owner_request_id: RequestId,
315 waiters: Vec<RequestId>,
316 },
317}
318
319#[derive(Clone)]
320struct RequestRuntimeDebug {
321 method_id: vox_types::MethodId,
322 service: Option<&'static str>,
323 method: Option<&'static str>,
324 started_at: Instant,
325 state: RequestDebugState,
326 response_sender_blocked: Option<bool>,
327 associated_channels: Vec<ChannelId>,
328}
329
330impl RequestRuntimeDebug {
331 fn snapshot(&self, request_id: RequestId, now: Instant) -> RequestDebugSnapshot {
332 RequestDebugSnapshot {
333 request_id,
334 service: self.service,
335 method: self.method,
336 method_id: self.method_id,
337 age: now.saturating_duration_since(self.started_at),
338 state: self.state,
339 response_sender_blocked: self.response_sender_blocked,
340 associated_channels: self.associated_channels.clone(),
341 }
342 }
343}
344
345#[derive(Clone)]
346struct ChannelRuntimeDebug {
347 direction: ChannelDirection,
348 debug: Option<ChannelDebugContext>,
349 initial_credit: u32,
350 inbound_queue_len: usize,
351 inbound_queue_capacity: Option<usize>,
352 receiver_state: ChannelReceiverState,
353 last_item_sent_at: Option<Instant>,
354 last_item_received_at: Option<Instant>,
355 last_item_consumed_at: Option<Instant>,
356 last_credit_granted_at: Option<Instant>,
357 last_credit_received_at: Option<Instant>,
358 last_credit_granted_amount: Option<u32>,
359 last_credit_received_amount: Option<u32>,
360 pending_local_grant_credit: u32,
361 total_credit_granted: u64,
362 total_credit_received: u64,
363 sent: u64,
364 sends_started: u64,
365 sends_completed: u64,
366 sends_waited_for_credit: u64,
367 try_send_full_credit: u64,
368 try_send_full_runtime_queue: u64,
369 closed: u64,
370 reset: u64,
371 dropped: u64,
372 items_received: u64,
373 items_consumed: u64,
374 credit_granted: u64,
375 credit_received: u64,
376 close_reason: Option<ChannelCloseReason>,
377 reset_reason: Option<ChannelResetReason>,
378}
379
380impl ChannelRuntimeDebug {
381 fn new(
382 direction: ChannelDirection,
383 initial_credit: u32,
384 debug: Option<ChannelDebugContext>,
385 ) -> Self {
386 Self {
387 direction,
388 debug,
389 initial_credit,
390 inbound_queue_len: 0,
391 inbound_queue_capacity: match direction {
392 ChannelDirection::Rx => Some(initial_credit as usize),
393 ChannelDirection::Tx => None,
394 },
395 receiver_state: ChannelReceiverState::Present,
396 last_item_sent_at: None,
397 last_item_received_at: None,
398 last_item_consumed_at: None,
399 last_credit_granted_at: None,
400 last_credit_received_at: None,
401 last_credit_granted_amount: None,
402 last_credit_received_amount: None,
403 pending_local_grant_credit: 0,
404 total_credit_granted: 0,
405 total_credit_received: 0,
406 sent: 0,
407 sends_started: 0,
408 sends_completed: 0,
409 sends_waited_for_credit: 0,
410 try_send_full_credit: 0,
411 try_send_full_runtime_queue: 0,
412 closed: 0,
413 reset: 0,
414 dropped: 0,
415 items_received: 0,
416 items_consumed: 0,
417 credit_granted: 0,
418 credit_received: 0,
419 close_reason: None,
420 reset_reason: None,
421 }
422 }
423
424 fn merge_debug(&mut self, debug: Option<ChannelDebugContext>) {
425 if self.debug.is_none() {
426 self.debug = debug;
427 }
428 }
429
430 fn mark_item_received(&mut self, now: Instant) {
431 self.items_received = self.items_received.saturating_add(1);
432 self.inbound_queue_len = self.inbound_queue_len.saturating_add(1);
433 self.last_item_received_at = Some(now);
434 }
435
436 fn mark_closed(&mut self, reason: ChannelCloseReason) {
437 self.closed = self.closed.saturating_add(1);
438 self.close_reason = Some(reason);
439 self.receiver_state = ChannelReceiverState::Closed;
440 if reason == ChannelCloseReason::Dropped {
441 self.dropped = self.dropped.saturating_add(1);
442 self.receiver_state = ChannelReceiverState::Dropped;
443 }
444 }
445
446 fn mark_reset(&mut self, reason: ChannelResetReason) {
447 self.reset = self.reset.saturating_add(1);
448 self.reset_reason = Some(reason);
449 self.receiver_state = ChannelReceiverState::Reset;
450 }
451
452 fn mark_send_started(&mut self) {
453 self.sends_started = self.sends_started.saturating_add(1);
454 }
455
456 fn mark_send_waiting_for_credit(&mut self) {
457 self.sends_waited_for_credit = self.sends_waited_for_credit.saturating_add(1);
458 }
459
460 fn mark_send_finished(&mut self, outcome: ChannelSendOutcome, now: Instant) {
461 self.sends_completed = self.sends_completed.saturating_add(1);
462 if outcome == ChannelSendOutcome::Sent {
463 self.sent = self.sent.saturating_add(1);
464 self.last_item_sent_at = Some(now);
465 }
466 }
467
468 fn mark_try_send_outcome(&mut self, outcome: ChannelTrySendOutcome, now: Instant) {
469 match outcome {
470 ChannelTrySendOutcome::Sent => {
471 self.sent = self.sent.saturating_add(1);
472 self.last_item_sent_at = Some(now);
473 }
474 ChannelTrySendOutcome::FullCredit => {
475 self.try_send_full_credit = self.try_send_full_credit.saturating_add(1);
476 }
477 ChannelTrySendOutcome::FullRuntimeQueue => {
478 self.try_send_full_runtime_queue =
479 self.try_send_full_runtime_queue.saturating_add(1);
480 }
481 ChannelTrySendOutcome::Unbound | ChannelTrySendOutcome::Closed => {}
482 }
483 }
484
485 fn mark_item_consumed(&mut self, now: Instant) {
486 self.items_consumed = self.items_consumed.saturating_add(1);
487 self.inbound_queue_len = self.inbound_queue_len.saturating_sub(1);
488 self.last_item_consumed_at = Some(now);
489 }
490
491 fn mark_inbound_item_not_enqueued(&mut self) {
492 self.inbound_queue_len = self.inbound_queue_len.saturating_sub(1);
493 }
494
495 fn mark_credit_granted(&mut self, amount: u32, now: Instant) {
496 self.credit_granted = self.credit_granted.saturating_add(1);
497 self.total_credit_granted = self.total_credit_granted.saturating_add(amount as u64);
498 self.last_credit_granted_at = Some(now);
499 self.last_credit_granted_amount = Some(amount);
500 self.pending_local_grant_credit = 0;
501 }
502
503 fn mark_credit_received(&mut self, amount: u32, now: Instant) {
504 self.credit_received = self.credit_received.saturating_add(1);
505 self.total_credit_received = self.total_credit_received.saturating_add(amount as u64);
506 self.last_credit_received_at = Some(now);
507 self.last_credit_received_amount = Some(amount);
508 }
509
510 fn mark_receiver_dropped(&mut self) {
511 self.reset = self.reset.saturating_add(1);
512 self.reset_reason = Some(ChannelResetReason::ReceiverDropped);
513 self.receiver_state = ChannelReceiverState::Dropped;
514 self.dropped = self.dropped.saturating_add(1);
515 }
516
517 fn snapshot(
518 &self,
519 connection_id: ConnectionId,
520 channel_id: ChannelId,
521 available_send_credit: Option<u32>,
522 ) -> ChannelDebugSnapshot {
523 ChannelDebugSnapshot {
524 connection_id,
525 channel_id,
526 direction: self.direction,
527 debug: self.debug,
528 initial_credit: self.initial_credit,
529 available_send_credit,
530 inbound_queue_len: Some(self.inbound_queue_len),
531 inbound_queue_capacity: self.inbound_queue_capacity,
532 outbound_runtime_queue_len: None,
533 outbound_runtime_queue_capacity: None,
534 send_waiters_count: None,
535 receiver_state: self.receiver_state,
536 last_item_sent_at: self.last_item_sent_at,
537 last_item_received_at: self.last_item_received_at,
538 last_item_consumed_at: self.last_item_consumed_at,
539 last_credit_granted_at: self.last_credit_granted_at,
540 last_credit_received_at: self.last_credit_received_at,
541 last_credit_granted_amount: self.last_credit_granted_amount,
542 last_credit_received_amount: self.last_credit_received_amount,
543 pending_local_grant_credit: self.pending_local_grant_credit,
544 total_credit_granted: self.total_credit_granted,
545 total_credit_received: self.total_credit_received,
546 current_permit_count: available_send_credit,
547 zero_credit_with_blocked_senders: available_send_credit == Some(0)
548 && self.sends_waited_for_credit > 0,
549 sent: self.sent,
550 sends_started: self.sends_started,
551 sends_completed: self.sends_completed,
552 sends_waited_for_credit: self.sends_waited_for_credit,
553 try_send_full_credit: self.try_send_full_credit,
554 try_send_full_runtime_queue: self.try_send_full_runtime_queue,
555 closed: self.closed,
556 reset: self.reset,
557 dropped: self.dropped,
558 items_received: self.items_received,
559 items_consumed: self.items_consumed,
560 credit_granted: self.credit_granted,
561 credit_received: self.credit_received,
562 close_reason: self.close_reason,
563 reset_reason: self.reset_reason,
564 }
565 }
566}
567
568struct DriverShared {
573 connection_id: ConnectionId,
574 pending_responses: SyncMutex<BTreeMap<RequestId, ResponseSlot>>,
575 request_ids: SyncMutex<IdAllocator<RequestId>>,
576 next_operation_id: AtomicU64,
577 operations: Arc<dyn OperationStore>,
578 channel_ids: SyncMutex<IdAllocator<ChannelId>>,
579 channel_senders: SyncMutex<BTreeMap<ChannelId, ChannelMailboxSender<IncomingChannelMessage>>>,
581 channel_receivers:
584 SyncMutex<BTreeMap<ChannelId, ChannelMailboxReceiver<IncomingChannelMessage>>>,
585 channel_credits: SyncMutex<BTreeMap<ChannelId, Arc<Semaphore>>>,
588 channel_contexts: SyncMutex<BTreeMap<ChannelId, ChannelDebugContext>>,
590 request_debug: SyncMutex<BTreeMap<RequestId, RequestRuntimeDebug>>,
592 channel_debug: SyncMutex<BTreeMap<ChannelId, ChannelRuntimeDebug>>,
594 last_inbound_message_at: SyncMutex<Option<Instant>>,
595 last_outbound_message_at: SyncMutex<Option<Instant>>,
596 close_reason: SyncMutex<Option<ConnectionCloseReason>>,
597 terminal_channels: SyncMutex<HashSet<ChannelId>>,
601 stale_close_channels: SyncMutex<std::collections::HashSet<ChannelId>>,
606 local_initial_channel_credit: u32,
608 peer_initial_channel_credit: u32,
610 observer: Option<VoxObserverHandle>,
611}
612
613impl DriverShared {
614 fn remember_channel_context(
615 &self,
616 channel_id: ChannelId,
617 debug_context: Option<ChannelDebugContext>,
618 ) {
619 if let Some(debug_context) = debug_context.and_then(ChannelDebugContext::into_option) {
620 self.channel_contexts
621 .lock()
622 .insert(channel_id, debug_context);
623 if let Some(channel) = self.channel_debug.lock().get_mut(&channel_id) {
624 channel.debug = Some(debug_context);
625 }
626 }
627 }
628
629 fn channel_event_context(
630 &self,
631 channel_id: ChannelId,
632 debug_context: Option<ChannelDebugContext>,
633 ) -> ChannelEventContext {
634 let debug = debug_context
635 .and_then(ChannelDebugContext::into_option)
636 .or_else(|| self.channel_contexts.lock().get(&channel_id).copied());
637 ChannelEventContext {
638 connection_id: Some(self.connection_id),
639 channel_id,
640 debug,
641 }
642 }
643
644 fn emit_channel_event(
645 &self,
646 channel_id: ChannelId,
647 debug_context: Option<ChannelDebugContext>,
648 event: impl FnOnce(ChannelEventContext) -> ChannelEvent,
649 ) {
650 if let Some(observer) = &self.observer {
651 observer.channel_event(event(self.channel_event_context(channel_id, debug_context)));
652 }
653 }
654
655 fn observe_channel(
656 &self,
657 channel_id: ChannelId,
658 debug_context: Option<ChannelDebugContext>,
659 event: impl FnOnce(ChannelEventContext) -> ChannelEvent,
660 ) {
661 let event = event(self.channel_event_context(channel_id, debug_context));
662 self.record_channel_event(event);
663 if let Some(observer) = &self.observer {
664 observer.channel_event(event);
665 }
666 }
667
668 fn update_channel_debug(
669 &self,
670 channel: ChannelEventContext,
671 default_direction: ChannelDirection,
672 default_initial_credit: u32,
673 update: impl FnOnce(&mut ChannelRuntimeDebug),
674 ) {
675 let mut channels = self.channel_debug.lock();
676 let entry = channels.entry(channel.channel_id).or_insert_with(|| {
677 ChannelRuntimeDebug::new(default_direction, default_initial_credit, channel.debug)
678 });
679 entry.merge_debug(channel.debug);
680 update(entry);
681 }
682
683 fn update_existing_channel_debug(
684 &self,
685 channel_id: ChannelId,
686 update: impl FnOnce(&mut ChannelRuntimeDebug),
687 ) {
688 if let Some(channel) = self.channel_debug.lock().get_mut(&channel_id) {
689 update(channel);
690 }
691 }
692
693 fn record_channel_event(&self, event: ChannelEvent) {
694 let now = Instant::now();
695 match event {
696 ChannelEvent::Opened {
697 channel,
698 direction,
699 initial_credit,
700 } => {
701 self.channel_debug.lock().insert(
702 channel.channel_id,
703 ChannelRuntimeDebug::new(direction, initial_credit, channel.debug),
704 );
705 }
706 ChannelEvent::ItemReceived { channel } => {
707 self.update_channel_debug(channel, ChannelDirection::Rx, 0, |entry| {
708 entry.mark_item_received(now);
709 });
710 }
711 ChannelEvent::Closed { channel, reason } => {
712 self.update_channel_debug(channel, ChannelDirection::Rx, 0, |entry| {
713 entry.mark_closed(reason);
714 });
715 }
716 ChannelEvent::Reset { channel, reason } => {
717 self.update_channel_debug(channel, ChannelDirection::Rx, 0, |entry| {
718 entry.mark_reset(reason);
719 });
720 }
721 ChannelEvent::CreditGranted { channel, amount } => {
722 self.record_credit_granted_at(channel.channel_id, amount, now);
723 }
724 ChannelEvent::SendStarted { channel } => {
725 self.record_send_started(channel.channel_id);
726 }
727 ChannelEvent::SendWaitingForCredit { channel } => {
728 self.record_send_waiting_for_credit(channel.channel_id);
729 }
730 ChannelEvent::SendFinished {
731 channel, outcome, ..
732 } => {
733 self.record_send_finished(channel.channel_id, outcome);
734 }
735 ChannelEvent::TrySend { channel, outcome } => {
736 self.record_try_send_outcome(channel.channel_id, outcome);
737 }
738 ChannelEvent::ItemConsumed { channel } => {
739 self.record_item_consumed(channel.channel_id);
740 }
741 }
742 }
743
744 fn mark_inbound_progress(&self) {
745 *self.last_inbound_message_at.lock() = Some(Instant::now());
746 }
747
748 fn mark_outbound_progress(&self) {
749 *self.last_outbound_message_at.lock() = Some(Instant::now());
750 }
751
752 fn start_request(
753 &self,
754 request_id: RequestId,
755 method_id: vox_types::MethodId,
756 service: Option<&'static str>,
757 method: Option<&'static str>,
758 state: RequestDebugState,
759 ) {
760 self.request_debug.lock().insert(
761 request_id,
762 RequestRuntimeDebug {
763 method_id,
764 service,
765 method,
766 started_at: Instant::now(),
767 state,
768 response_sender_blocked: Some(false),
769 associated_channels: Vec::new(),
770 },
771 );
772 }
773
774 fn finish_request(&self, request_id: RequestId, state: RequestDebugState) {
775 if let Some(request) = self.request_debug.lock().get_mut(&request_id) {
776 request.state = state;
777 }
778 self.request_debug.lock().remove(&request_id);
779 }
780
781 fn record_send_started(&self, channel_id: ChannelId) {
782 self.update_existing_channel_debug(channel_id, ChannelRuntimeDebug::mark_send_started);
783 }
784
785 fn record_send_waiting_for_credit(&self, channel_id: ChannelId) {
786 self.update_existing_channel_debug(
787 channel_id,
788 ChannelRuntimeDebug::mark_send_waiting_for_credit,
789 );
790 }
791
792 fn record_send_finished(&self, channel_id: ChannelId, outcome: ChannelSendOutcome) {
793 let now = Instant::now();
794 self.update_existing_channel_debug(channel_id, |channel| {
795 channel.mark_send_finished(outcome, now);
796 });
797 }
798
799 fn record_try_send_outcome(&self, channel_id: ChannelId, outcome: ChannelTrySendOutcome) {
800 let now = Instant::now();
801 self.update_existing_channel_debug(channel_id, |channel| {
802 channel.mark_try_send_outcome(outcome, now);
803 });
804 }
805
806 fn record_item_consumed(&self, channel_id: ChannelId) {
807 let now = Instant::now();
808 self.update_existing_channel_debug(channel_id, |channel| {
809 channel.mark_item_consumed(now);
810 });
811 }
812
813 fn record_inbound_item_not_enqueued(&self, channel_id: ChannelId) {
814 self.update_existing_channel_debug(
815 channel_id,
816 ChannelRuntimeDebug::mark_inbound_item_not_enqueued,
817 );
818 }
819
820 fn record_pending_local_grant(&self, channel_id: ChannelId, pending: u32) {
821 self.update_existing_channel_debug(channel_id, |channel| {
822 channel.pending_local_grant_credit = pending;
823 });
824 }
825
826 fn record_credit_granted_at(&self, channel_id: ChannelId, amount: u32, now: Instant) {
827 self.update_existing_channel_debug(channel_id, |channel| {
828 channel.mark_credit_granted(amount, now);
829 });
830 }
831
832 fn record_credit_received(&self, channel_id: ChannelId, amount: u32) {
833 let now = Instant::now();
834 self.update_existing_channel_debug(channel_id, |channel| {
835 channel.mark_credit_received(amount, now);
836 });
837 }
838
839 fn record_receiver_dropped(&self, channel_id: ChannelId) {
840 self.update_existing_channel_debug(channel_id, ChannelRuntimeDebug::mark_receiver_dropped);
841 }
842
843 fn new_channel_mailbox(
844 &self,
845 ) -> (
846 ChannelMailboxSender<IncomingChannelMessage>,
847 ChannelMailboxReceiver<IncomingChannelMessage>,
848 ) {
849 channel_mailbox(
850 "driver.channel_mailbox",
851 self.local_initial_channel_credit as usize,
852 )
853 }
854
855 fn inbound_channel_sender(
856 &self,
857 channel_id: ChannelId,
858 ) -> ChannelMailboxSender<IncomingChannelMessage> {
859 let mut senders = self.channel_senders.lock();
860 if let Some(sender) = senders.get(&channel_id) {
861 return sender.clone();
862 }
863
864 let (sender, receiver) = self.new_channel_mailbox();
865 senders.insert(channel_id, sender.clone());
866 self.channel_receivers.lock().insert(channel_id, receiver);
867 sender
868 }
869
870 fn register_inbound_channel_receiver(
871 &self,
872 channel_id: ChannelId,
873 ) -> (ChannelMailboxReceiver<IncomingChannelMessage>, bool) {
874 let terminal = self.terminal_channels.lock().contains(&channel_id);
875 let mut senders = self.channel_senders.lock();
876 let mut receivers = self.channel_receivers.lock();
877
878 if let Some(receiver) = receivers.remove(&channel_id) {
879 return (receiver, terminal);
880 }
881
882 let (sender, receiver) = self.new_channel_mailbox();
883 if terminal {
884 drop(sender);
885 } else {
886 senders.insert(channel_id, sender);
887 }
888 (receiver, terminal)
889 }
890
891 fn debug_snapshot(
892 &self,
893 sender: &ConnectionSender,
894 state: ConnectionDebugState,
895 driver_task_status: DriverTaskStatus,
896 ) -> VoxDebugSnapshot {
897 let now = Instant::now();
898 let requests: Vec<_> = self
899 .request_debug
900 .lock()
901 .iter()
902 .map(|(request_id, request)| request.snapshot(*request_id, now))
903 .collect();
904 let credits = self.shared_channel_credit_snapshot();
905 let open_channels: Vec<_> = self
906 .channel_debug
907 .lock()
908 .iter()
909 .map(|(channel_id, channel)| {
910 channel.snapshot(
911 self.connection_id,
912 *channel_id,
913 credits.get(channel_id).copied().flatten(),
914 )
915 })
916 .collect();
917 let last_inbound_message_at = *self.last_inbound_message_at.lock();
918 let last_outbound_message_at = *self.last_outbound_message_at.lock();
919 let last_progress_at = match (last_inbound_message_at, last_outbound_message_at) {
920 (Some(inbound), Some(outbound)) => Some(inbound.max(outbound)),
921 (Some(inbound), None) => Some(inbound),
922 (None, Some(outbound)) => Some(outbound),
923 (None, None) => None,
924 };
925 let (outbound_queue_depth, outbound_queue_capacity) =
926 sender.sess_core.outbound_queue_stats();
927 VoxDebugSnapshot {
928 connections: vec![ConnectionDebugSnapshot {
929 connection_id: self.connection_id,
930 endpoint: None,
931 surface: None,
932 component: None,
933 state,
934 outstanding_requests: requests.len(),
935 requests,
936 open_channels,
937 outbound_queue_depth: Some(outbound_queue_depth),
938 outbound_queue_capacity: Some(outbound_queue_capacity),
939 local_control_queue_depth: None,
940 local_control_queue_capacity: None,
941 last_inbound_message_at,
942 last_outbound_message_at,
943 last_progress_at,
944 close_reason: *self.close_reason.lock(),
945 driver_task_status,
946 }],
947 }
948 }
949
950 fn shared_channel_credit_snapshot(&self) -> BTreeMap<ChannelId, Option<u32>> {
951 self.channel_credits
952 .lock()
953 .iter()
954 .map(|(channel_id, semaphore)| {
955 (
956 *channel_id,
957 Some(semaphore.available_permits().min(u32::MAX as usize) as u32),
958 )
959 })
960 .collect()
961 }
962
963 fn set_connection_closed(&self, reason: ConnectionCloseReason) {
964 *self.close_reason.lock() = Some(reason);
965 }
966
967 fn connection_debug_state(&self, closed: bool) -> ConnectionDebugState {
968 if closed {
969 ConnectionDebugState::Closed
970 } else {
971 ConnectionDebugState::Open
972 }
973 }
974}
975
976struct CallerDropGuard {
977 control_tx: mpsc::UnboundedSender<DropControlRequest>,
978 request: DropControlRequest,
979}
980
981impl Drop for CallerDropGuard {
982 fn drop(&mut self) {
983 let _ = self.control_tx.send(self.request);
984 }
985}
986
987#[cfg(test)]
988mod tests {
989 use super::{DriverChannelCreditReplenisher, DriverLocalControl};
990 use vox_types::{ChannelCreditReplenisher, ChannelId};
991
992 #[tokio::test]
993 async fn replenisher_batches_at_half_the_initial_window() {
994 let (tx, mut rx) = moire::sync::mpsc::unbounded_channel("test.replenisher");
995 let replenisher = DriverChannelCreditReplenisher::new(
996 vox_types::ConnectionId::ROOT,
997 ChannelId(7),
998 None,
999 std::sync::Weak::new(),
1000 16,
1001 tx,
1002 None,
1003 );
1004
1005 for _ in 0..7 {
1006 replenisher.on_item_consumed();
1007 }
1008 assert!(
1009 vox_types::time::tokio::timeout(std::time::Duration::from_millis(20), rx.recv())
1010 .await
1011 .is_err(),
1012 "should not emit credit before reaching the batch threshold"
1013 );
1014
1015 replenisher.on_item_consumed();
1016 let Some(DriverLocalControl::GrantCredit {
1017 channel_id,
1018 additional,
1019 }) = rx.recv().await
1020 else {
1021 panic!("expected batched credit grant");
1022 };
1023 assert_eq!(channel_id, ChannelId(7));
1024 assert_eq!(additional, 8);
1025 }
1026
1027 #[tokio::test]
1028 async fn replenisher_grants_one_by_one_for_single_credit_windows() {
1029 let (tx, mut rx) = moire::sync::mpsc::unbounded_channel("test.replenisher.single");
1030 let replenisher = DriverChannelCreditReplenisher::new(
1031 vox_types::ConnectionId::ROOT,
1032 ChannelId(9),
1033 None,
1034 std::sync::Weak::new(),
1035 1,
1036 tx,
1037 None,
1038 );
1039
1040 replenisher.on_item_consumed();
1041 let Some(DriverLocalControl::GrantCredit {
1042 channel_id,
1043 additional,
1044 }) = rx.recv().await
1045 else {
1046 panic!("expected immediate credit grant");
1047 };
1048 assert_eq!(channel_id, ChannelId(9));
1049 assert_eq!(additional, 1);
1050 }
1051}
1052
1053pub struct DriverReplySink {
1061 sender: Option<ConnectionSender>,
1062 request_id: RequestId,
1063 method_id: vox_types::MethodId,
1064 retry: vox_types::RetryPolicy,
1065 operation_id: Option<OperationId>,
1066 operations: Option<Arc<dyn OperationStore>>,
1067 live_operations: Option<Arc<SyncMutex<LiveOperationTracker>>>,
1068 binder: DriverChannelBinder,
1069 handler_response_shape: Option<&'static facet_core::Shape>,
1073}
1074
1075async fn replay_sealed_response(
1081 sender: ConnectionSender,
1082 request_id: RequestId,
1083 method_id: vox_types::MethodId,
1084 encoded_response: &[u8],
1085 response_shape: Option<&'static facet_core::Shape>,
1086) -> Result<(), ()> {
1087 let mut response: RequestResponse<'_> =
1088 vox_postcard::from_slice_borrowed(encoded_response).map_err(|_| ())?;
1089 if let Some(shape) = response_shape {
1090 sender.prepare_replay_schemas(request_id, method_id, shape, &mut response);
1091 } else {
1092 response.schemas = Default::default();
1093 }
1094 sender.send_response(request_id, response).await
1095}
1096
1097fn incoming_args_bytes<'a>(call: &'a RequestCall<'a>) -> &'a [u8] {
1098 match &call.args {
1099 Payload::PostcardBytes(bytes) => bytes,
1100 Payload::Value { .. } => {
1101 panic!("incoming request payload should always be decoded as incoming bytes")
1102 }
1103 }
1104}
1105
1106impl ReplySink for DriverReplySink {
1107 async fn send_reply(mut self, response: RequestResponse<'_>) {
1108 let sender = self
1109 .sender
1110 .take()
1111 .expect("unreachable: send_reply takes self by value");
1112
1113 vox_types::dlog!(
1114 "[driver] send_reply: conn={:?} req={:?} method={:?} payload={} operation_id={:?}",
1115 sender.connection_id(),
1116 self.request_id,
1117 self.method_id,
1118 match &response.ret {
1119 Payload::Value { .. } => "Value",
1120 Payload::PostcardBytes(_) => "PostcardBytes",
1121 },
1122 self.operation_id
1123 );
1124 tracing::debug!(
1125 conn_id = ?sender.connection_id(),
1126 req_id = ?self.request_id,
1127 method_id = ?self.method_id,
1128 operation_id = ?self.operation_id,
1129 payload = match &response.ret {
1130 Payload::Value { .. } => "value",
1131 Payload::PostcardBytes(_) => "postcard-bytes",
1132 },
1133 "vox driver sending reply"
1134 );
1135 self.binder.shared.mark_outbound_progress();
1136
1137 if let Payload::Value { shape, .. } = &response.ret
1138 && let Ok(extracted) = vox_types::extract_schemas(shape)
1139 {
1140 vox_types::dlog!(
1141 "[schema] driver send_reply: method={:?} root={:?}",
1142 self.method_id,
1143 extracted.root
1144 );
1145 }
1146
1147 if let (Some(operation_id), Some(operations)) = (self.operation_id, self.operations.take())
1148 {
1149 let mut response = response;
1150 sender.prepare_response_for_method(self.request_id, self.method_id, &mut response);
1151
1152 let schemas_for_wire = std::mem::take(&mut response.schemas);
1153 #[cfg(not(target_arch = "wasm32"))]
1154 let encoded_bytes: Vec<u8> =
1155 vox_jit::encode!(&response).expect("JIT encode failed for response store");
1156 #[cfg(target_arch = "wasm32")]
1157 let encoded_bytes: Vec<u8> =
1158 vox_postcard::to_vec(&response).expect("postcard encode failed for response store");
1159 let encoded_for_store: PostcardPayload = encoded_bytes.into();
1160 response.schemas = schemas_for_wire;
1161
1162 vox_types::dlog!(
1164 "[driver] send_reply wire send: conn={:?} req={:?} method={:?} schemas={}",
1165 sender.connection_id(),
1166 self.request_id,
1167 self.method_id,
1168 response.schemas.0.len()
1169 );
1170 if let Err(_e) = sender.send_response(self.request_id, response).await {
1171 tracing::debug!(
1172 conn_id = ?sender.connection_id(),
1173 req_id = ?self.request_id,
1174 method_id = ?self.method_id,
1175 "vox driver reply send failed"
1176 );
1177 sender.mark_failure(self.request_id, FailureDisposition::Cancelled);
1178 }
1179
1180 operations.seal(operation_id, self.method_id, &encoded_for_store);
1183
1184 let waiters = self
1186 .live_operations
1187 .as_ref()
1188 .map(|lo| lo.lock().seal(operation_id))
1189 .unwrap_or_default();
1190 let response_shape = self.handler_response_shape;
1191 for waiter in waiters {
1192 if waiter == self.request_id {
1193 continue;
1194 }
1195 if replay_sealed_response(
1196 sender.clone(),
1197 waiter,
1198 self.method_id,
1199 encoded_for_store.as_bytes(),
1200 response_shape,
1201 )
1202 .await
1203 .is_err()
1204 {
1205 sender.mark_failure(waiter, FailureDisposition::Cancelled);
1206 }
1207 }
1208 } else {
1209 vox_types::dlog!(
1210 "[driver] send_reply direct send: conn={:?} req={:?} method={:?}",
1211 sender.connection_id(),
1212 self.request_id,
1213 self.method_id
1214 );
1215 if let Err(_e) = sender
1216 .send_response_for_method(self.request_id, self.method_id, response)
1217 .await
1218 {
1219 tracing::debug!(
1220 conn_id = ?sender.connection_id(),
1221 req_id = ?self.request_id,
1222 method_id = ?self.method_id,
1223 "vox driver reply send failed"
1224 );
1225 sender.mark_failure(self.request_id, FailureDisposition::Cancelled);
1226 }
1227 }
1228 }
1229
1230 fn channel_binder(&self) -> Option<&dyn ChannelBinder> {
1231 Some(&self.binder)
1232 }
1233
1234 fn request_id(&self) -> Option<RequestId> {
1235 Some(self.request_id)
1236 }
1237
1238 fn connection_id(&self) -> Option<vox_types::ConnectionId> {
1239 self.sender.as_ref().map(|sender| sender.connection_id())
1240 }
1241}
1242
1243impl Drop for DriverReplySink {
1245 fn drop(&mut self) {
1246 if let Some(sender) = self.sender.take() {
1247 let disposition = if self.retry.persist {
1248 FailureDisposition::Indeterminate
1249 } else {
1250 FailureDisposition::Cancelled
1251 };
1252
1253 if let Some(operation_id) = self.operation_id {
1254 if let Some(live_ops) = self.live_operations.take()
1260 && let Some(live) = live_ops.lock().release(operation_id)
1261 {
1262 for waiter in live.waiters {
1263 sender.mark_failure(waiter, disposition);
1264 }
1265 return;
1266 }
1267 }
1268
1269 sender.mark_failure(self.request_id, disposition);
1270 }
1271 }
1272}
1273
1274pub struct DriverChannelSink {
1282 sender: ConnectionSender,
1283 shared: Arc<DriverShared>,
1284 channel_id: ChannelId,
1285 debug_context: Option<ChannelDebugContext>,
1286 local_control_tx: mpsc::UnboundedSender<DriverLocalControl>,
1287}
1288
1289impl ChannelSink for DriverChannelSink {
1290 fn send_payload<'payload>(
1291 &self,
1292 payload: Payload<'payload>,
1293 ) -> Pin<Box<dyn vox_types::MaybeSendFuture<Output = Result<(), TxError>> + 'payload>> {
1294 let sender = self.sender.clone();
1295 let shared = Arc::clone(&self.shared);
1296 let channel_id = self.channel_id;
1297 Box::pin(async move {
1298 if shared.terminal_channels.lock().contains(&channel_id) {
1299 return Err(TxError::Transport("channel closed".into()));
1300 }
1301
1302 shared.mark_outbound_progress();
1303 sender
1304 .send(ConnectionMessage::Channel(ChannelMessage {
1305 id: channel_id,
1306 body: ChannelBody::Item(ChannelItem { item: payload }),
1307 }))
1308 .await
1309 .map_err(|()| TxError::Transport("connection closed".into()))
1310 })
1311 }
1312
1313 fn channel_id(&self) -> Option<ChannelId> {
1314 Some(self.channel_id)
1315 }
1316
1317 fn connection_id(&self) -> Option<vox_types::ConnectionId> {
1318 Some(self.sender.connection_id())
1319 }
1320
1321 fn debug_context(&self) -> Option<ChannelDebugContext> {
1322 self.debug_context
1323 .and_then(ChannelDebugContext::into_option)
1324 .or_else(|| {
1325 self.shared
1326 .channel_contexts
1327 .lock()
1328 .get(&self.channel_id)
1329 .copied()
1330 })
1331 }
1332
1333 fn observer(&self) -> Option<VoxObserverHandle> {
1334 self.shared.observer.clone()
1335 }
1336
1337 fn note_send_started(&self) {
1338 self.shared.record_send_started(self.channel_id);
1339 }
1340
1341 fn note_send_waiting_for_credit(&self) {
1342 self.shared.record_send_waiting_for_credit(self.channel_id);
1343 }
1344
1345 fn note_send_finished(&self, outcome: ChannelSendOutcome) {
1346 self.shared.record_send_finished(self.channel_id, outcome);
1347 }
1348
1349 fn note_try_send_outcome(&self, outcome: ChannelTrySendOutcome) {
1350 self.shared
1351 .record_try_send_outcome(self.channel_id, outcome);
1352 }
1353
1354 fn try_send_payload_with_outcome<'payload>(
1357 &self,
1358 payload: Payload<'payload>,
1359 ) -> Result<(), ChannelTrySendOutcome> {
1360 if self
1361 .shared
1362 .terminal_channels
1363 .lock()
1364 .contains(&self.channel_id)
1365 {
1366 return Err(ChannelTrySendOutcome::Closed);
1367 }
1368
1369 self.shared.mark_outbound_progress();
1370 self.sender
1371 .try_send(ConnectionMessage::Channel(ChannelMessage {
1372 id: self.channel_id,
1373 body: ChannelBody::Item(ChannelItem { item: payload }),
1374 }))
1375 .map_err(|err| match err {
1376 TrySendError::Closed(()) => ChannelTrySendOutcome::Closed,
1377 TrySendError::Full(()) => ChannelTrySendOutcome::FullRuntimeQueue,
1378 })
1379 }
1380
1381 fn close_channel(
1382 &self,
1383 _metadata: vox_types::Metadata,
1384 ) -> Pin<Box<dyn vox_types::MaybeSendFuture<Output = Result<(), TxError>> + 'static>> {
1385 let sender = self.sender.clone();
1389 let shared = Arc::clone(&self.shared);
1390 let channel_id = self.channel_id;
1391 let debug_context = self.debug_context;
1392 Box::pin(async move {
1393 shared.terminal_channels.lock().insert(channel_id);
1394 shared.observe_channel(channel_id, debug_context, |channel| ChannelEvent::Closed {
1395 channel,
1396 reason: ChannelCloseReason::Local,
1397 });
1398
1399 shared.mark_outbound_progress();
1400 sender
1401 .send(ConnectionMessage::Channel(ChannelMessage {
1402 id: channel_id,
1403 body: ChannelBody::Close(ChannelClose {
1404 metadata: Default::default(),
1405 }),
1406 }))
1407 .await
1408 .map_err(|()| TxError::Transport("connection closed".into()))
1409 })
1410 }
1411
1412 fn close_channel_on_drop(&self) {
1413 self.shared.terminal_channels.lock().insert(self.channel_id);
1414 self.shared
1415 .observe_channel(self.channel_id, self.debug_context, |channel| {
1416 ChannelEvent::Closed {
1417 channel,
1418 reason: ChannelCloseReason::Dropped,
1419 }
1420 });
1421 let _ = self
1422 .local_control_tx
1423 .send(DriverLocalControl::CloseChannel {
1424 channel_id: self.channel_id,
1425 });
1426 }
1427}
1428
1429pub trait ErasedHandler: MaybeSend + MaybeSync + 'static {
1434 fn retry_policy(&self, method_id: vox_types::MethodId) -> vox_types::RetryPolicy {
1435 let _ = method_id;
1436 vox_types::RetryPolicy::VOLATILE
1437 }
1438
1439 fn args_have_channels(&self, method_id: vox_types::MethodId) -> bool {
1440 let _ = method_id;
1441 false
1442 }
1443
1444 fn response_wire_shape(&self, method_id: vox_types::MethodId) -> Option<&'static facet::Shape> {
1445 let _ = method_id;
1446 None
1447 }
1448
1449 fn handle_erased(
1450 &self,
1451 call: SelfRef<RequestCall<'static>>,
1452 reply: DriverReplySink,
1453 schemas: std::sync::Arc<vox_types::SchemaRecvTracker>,
1454 ) -> BoxFut<'_, ()>;
1455}
1456
1457impl<H: Handler<DriverReplySink>> ErasedHandler for H {
1458 fn retry_policy(&self, method_id: vox_types::MethodId) -> vox_types::RetryPolicy {
1459 Handler::retry_policy(self, method_id)
1460 }
1461
1462 fn args_have_channels(&self, method_id: vox_types::MethodId) -> bool {
1463 Handler::args_have_channels(self, method_id)
1464 }
1465
1466 fn response_wire_shape(&self, method_id: vox_types::MethodId) -> Option<&'static facet::Shape> {
1467 Handler::response_wire_shape(self, method_id)
1468 }
1469
1470 fn handle_erased(
1471 &self,
1472 call: SelfRef<RequestCall<'static>>,
1473 reply: DriverReplySink,
1474 schemas: std::sync::Arc<vox_types::SchemaRecvTracker>,
1475 ) -> BoxFut<'_, ()> {
1476 Box::pin(Handler::handle(self, call, reply, schemas))
1477 }
1478}
1479
1480impl Handler<DriverReplySink> for Box<dyn ErasedHandler> {
1481 fn retry_policy(&self, method_id: vox_types::MethodId) -> vox_types::RetryPolicy {
1482 (**self).retry_policy(method_id)
1483 }
1484
1485 fn args_have_channels(&self, method_id: vox_types::MethodId) -> bool {
1486 (**self).args_have_channels(method_id)
1487 }
1488
1489 fn response_wire_shape(&self, method_id: vox_types::MethodId) -> Option<&'static facet::Shape> {
1490 (**self).response_wire_shape(method_id)
1491 }
1492
1493 async fn handle(
1494 &self,
1495 call: SelfRef<RequestCall<'static>>,
1496 reply: DriverReplySink,
1497 schemas: std::sync::Arc<vox_types::SchemaRecvTracker>,
1498 ) {
1499 (**self).handle_erased(call, reply, schemas).await
1500 }
1501}
1502
1503#[must_use = "Dropping this caller may close the connection if it is the last caller."]
1509#[derive(Clone)]
1510pub struct Caller {
1511 inner: Arc<DriverCaller>,
1512 service: Option<&'static vox_types::ServiceDescriptor>,
1513 middlewares: Vec<Arc<dyn vox_types::ClientMiddleware>>,
1514}
1515
1516impl Caller {
1517 pub fn new(driver: DriverCaller) -> Self {
1519 Self {
1520 inner: Arc::new(driver),
1521 service: None,
1522 middlewares: vec![],
1523 }
1524 }
1525
1526 #[cfg(test)]
1528 pub(crate) fn driver(&self) -> &DriverCaller {
1529 &self.inner
1530 }
1531
1532 pub fn with_middleware(
1534 mut self,
1535 service: &'static vox_types::ServiceDescriptor,
1536 middleware: impl vox_types::ClientMiddleware,
1537 ) -> Self {
1538 if let Some(existing_service) = self.service {
1539 assert_eq!(
1540 existing_service.service_name, service.service_name,
1541 "Caller middleware service mismatch"
1542 );
1543 } else {
1544 self.service = Some(service);
1545 }
1546 self.middlewares.push(Arc::new(middleware));
1547 self
1548 }
1549
1550 pub async fn call(&self, mut call: RequestCall<'_>) -> CallResult {
1553 use vox_types::{
1554 ClientCallOutcome, ClientContext, ClientRequest, Extensions, OwnedMetadata,
1555 };
1556
1557 let Some(service) = self.service else {
1558 return self.inner.call_inner(call, None).await;
1559 };
1560
1561 let extensions = Extensions::new();
1562 let method = service.by_id(call.method_id);
1563 let context = ClientContext::new(method, call.method_id, &extensions);
1564 let mut owned_metadata = OwnedMetadata::default();
1565
1566 if !self.middlewares.is_empty() {
1567 for middleware in &self.middlewares {
1568 let mut request = ClientRequest::new(&mut call, &mut owned_metadata);
1569 middleware.pre(&context, &mut request).await;
1570 }
1571 }
1572
1573 let request_debug = method.map(|method| (method.service_name, method.method_name));
1574 let result = self.inner.call_inner(call, request_debug).await;
1575 if !self.middlewares.is_empty() {
1576 let outcome = match &result {
1577 Ok(_) => ClientCallOutcome::Response,
1578 Err(error) => ClientCallOutcome::Error(error),
1579 };
1580 for middleware in self.middlewares.iter().rev() {
1581 middleware.post(&context, outcome).await;
1582 }
1583 }
1584 result
1585 }
1586
1587 pub async fn closed(&self) {
1589 if self.inner.closed_rx.borrow().is_some() {
1590 return;
1591 }
1592 let mut rx = self.inner.closed_rx.clone();
1593 while rx.changed().await.is_ok() {
1594 if rx.borrow().is_some() {
1595 return;
1596 }
1597 }
1598 }
1599
1600 pub fn is_connected(&self) -> bool {
1602 self.inner.closed_rx.borrow().is_none()
1603 }
1604
1605 pub fn channel_binder(&self) -> Option<&dyn ChannelBinder> {
1607 Some(self.inner.as_ref())
1608 }
1609
1610 pub fn debug_snapshot(&self) -> VoxDebugSnapshot {
1612 self.inner.debug_snapshot()
1613 }
1614
1615 pub fn dump_debug_snapshot(&self) -> VoxDebugSnapshot {
1616 let snapshot = self.debug_snapshot();
1617 tracing::info!(?snapshot, "vox debug snapshot");
1618 snapshot
1619 }
1620}
1621
1622pub trait FromVoxSession {
1628 const SERVICE_NAME: &'static str;
1631
1632 fn from_vox_session(
1633 caller: Caller,
1634 session_handle: Option<crate::session::SessionHandle>,
1635 ) -> Self;
1636}
1637
1638#[must_use = "Dropping NoopClient may close the connection if it is the last caller."]
1643#[derive(Clone)]
1644pub struct NoopClient {
1645 pub caller: Caller,
1647 pub session: Option<crate::session::SessionHandle>,
1649}
1650
1651impl FromVoxSession for NoopClient {
1652 const SERVICE_NAME: &'static str = "Noop";
1653
1654 fn from_vox_session(caller: Caller, session: Option<crate::session::SessionHandle>) -> Self {
1655 Self { caller, session }
1656 }
1657}
1658
1659#[derive(Clone)]
1660struct DriverChannelBinder {
1661 sender: ConnectionSender,
1662 shared: Arc<DriverShared>,
1663 local_control_tx: mpsc::UnboundedSender<DriverLocalControl>,
1664 drop_guard: Option<Arc<CallerDropGuard>>,
1665}
1666
1667fn register_rx_channel_impl(
1668 shared: &Arc<DriverShared>,
1669 channel_id: ChannelId,
1670 initial_channel_credit: u32,
1671 debug_context: Option<ChannelDebugContext>,
1672 liveness: Option<ChannelLivenessHandle>,
1673 local_control_tx: mpsc::UnboundedSender<DriverLocalControl>,
1674) -> vox_types::BoundChannelReceiver {
1675 observe_channel_opened(
1676 shared,
1677 channel_id,
1678 ChannelDirection::Rx,
1679 initial_channel_credit,
1680 debug_context,
1681 );
1682 let (rx, terminal) = shared.register_inbound_channel_receiver(channel_id);
1683
1684 if terminal {
1685 shared.channel_credits.lock().remove(&channel_id);
1686 return vox_types::BoundChannelReceiver {
1687 receiver: rx,
1688 liveness,
1689 replenisher: None,
1690 };
1691 }
1692
1693 vox_types::BoundChannelReceiver {
1694 receiver: rx,
1695 liveness,
1696 replenisher: Some(Arc::new(DriverChannelCreditReplenisher::new(
1697 shared.connection_id,
1698 channel_id,
1699 debug_context,
1700 Arc::downgrade(shared),
1701 initial_channel_credit,
1702 local_control_tx,
1703 shared.observer.clone(),
1704 )) as ChannelCreditReplenisherHandle),
1705 }
1706}
1707
1708fn observe_channel_opened(
1710 shared: &DriverShared,
1711 channel_id: ChannelId,
1712 direction: ChannelDirection,
1713 initial_credit: u32,
1714 debug_context: Option<ChannelDebugContext>,
1715) {
1716 shared.remember_channel_context(channel_id, debug_context);
1717 shared.observe_channel(channel_id, debug_context, |channel| ChannelEvent::Opened {
1718 channel,
1719 direction,
1720 initial_credit,
1721 });
1722}
1723
1724fn make_tx_channel_sink(
1725 sender: &ConnectionSender,
1726 shared: &Arc<DriverShared>,
1727 local_control_tx: &mpsc::UnboundedSender<DriverLocalControl>,
1728 channel_id: ChannelId,
1729 debug_context: Option<ChannelDebugContext>,
1730) -> Arc<CreditSink<DriverChannelSink>> {
1731 observe_channel_opened(
1732 shared,
1733 channel_id,
1734 ChannelDirection::Tx,
1735 shared.peer_initial_channel_credit,
1736 debug_context,
1737 );
1738 let inner = DriverChannelSink {
1739 sender: sender.clone(),
1740 shared: Arc::clone(shared),
1741 channel_id,
1742 debug_context: debug_context.and_then(ChannelDebugContext::into_option),
1743 local_control_tx: local_control_tx.clone(),
1744 };
1745 let sink = Arc::new(CreditSink::new(inner, shared.peer_initial_channel_credit));
1746 shared
1747 .channel_credits
1748 .lock()
1749 .insert(channel_id, Arc::clone(sink.credit()));
1750 sink
1751}
1752
1753trait DriverChannelEndpoint {
1754 fn endpoint_sender(&self) -> &ConnectionSender;
1755 fn endpoint_shared(&self) -> &Arc<DriverShared>;
1756 fn endpoint_local_control_tx(&self) -> &mpsc::UnboundedSender<DriverLocalControl>;
1757 fn endpoint_liveness(&self) -> Option<ChannelLivenessHandle>;
1758
1759 fn create_tx_credit_sink(
1760 &self,
1761 debug_context: Option<ChannelDebugContext>,
1762 ) -> (ChannelId, Arc<CreditSink<DriverChannelSink>>) {
1763 let shared = self.endpoint_shared();
1764 let channel_id = shared.channel_ids.lock().alloc();
1765 let sink = make_tx_channel_sink(
1766 self.endpoint_sender(),
1767 shared,
1768 self.endpoint_local_control_tx(),
1769 channel_id,
1770 debug_context,
1771 );
1772 (channel_id, sink)
1773 }
1774
1775 fn create_tx_dyn(
1776 &self,
1777 debug_context: Option<ChannelDebugContext>,
1778 ) -> (ChannelId, Arc<dyn ChannelSink>) {
1779 let (id, sink) = self.create_tx_credit_sink(debug_context);
1780 (id, sink as Arc<dyn ChannelSink>)
1781 }
1782
1783 fn create_rx_bound(
1784 &self,
1785 debug_context: Option<ChannelDebugContext>,
1786 ) -> (ChannelId, vox_types::BoundChannelReceiver) {
1787 let channel_id = self.endpoint_shared().channel_ids.lock().alloc();
1788 let rx = self.register_rx_bound(channel_id, debug_context);
1789 (channel_id, rx)
1790 }
1791
1792 fn bind_tx_dyn(
1793 &self,
1794 channel_id: ChannelId,
1795 debug_context: Option<ChannelDebugContext>,
1796 ) -> Arc<dyn ChannelSink> {
1797 make_tx_channel_sink(
1798 self.endpoint_sender(),
1799 self.endpoint_shared(),
1800 self.endpoint_local_control_tx(),
1801 channel_id,
1802 debug_context,
1803 )
1804 }
1805
1806 fn register_rx_bound(
1807 &self,
1808 channel_id: ChannelId,
1809 debug_context: Option<ChannelDebugContext>,
1810 ) -> vox_types::BoundChannelReceiver {
1811 let shared = self.endpoint_shared();
1812 register_rx_channel_impl(
1813 shared,
1814 channel_id,
1815 shared.local_initial_channel_credit,
1816 debug_context,
1817 self.endpoint_liveness(),
1818 self.endpoint_local_control_tx().clone(),
1819 )
1820 }
1821}
1822
1823impl DriverChannelEndpoint for DriverChannelBinder {
1824 fn endpoint_sender(&self) -> &ConnectionSender {
1825 &self.sender
1826 }
1827
1828 fn endpoint_shared(&self) -> &Arc<DriverShared> {
1829 &self.shared
1830 }
1831
1832 fn endpoint_local_control_tx(&self) -> &mpsc::UnboundedSender<DriverLocalControl> {
1833 &self.local_control_tx
1834 }
1835
1836 fn endpoint_liveness(&self) -> Option<ChannelLivenessHandle> {
1837 self.drop_guard
1838 .as_ref()
1839 .map(|guard| guard.clone() as ChannelLivenessHandle)
1840 }
1841}
1842
1843impl ChannelBinder for DriverChannelBinder {
1844 fn create_tx(&self) -> (ChannelId, Arc<dyn ChannelSink>) {
1845 self.create_tx_dyn(None)
1846 }
1847
1848 fn create_tx_with_context(
1849 &self,
1850 debug_context: Option<ChannelDebugContext>,
1851 ) -> (ChannelId, Arc<dyn ChannelSink>) {
1852 self.create_tx_dyn(debug_context)
1853 }
1854
1855 fn create_rx(&self) -> (ChannelId, vox_types::BoundChannelReceiver) {
1856 self.create_rx_bound(None)
1857 }
1858
1859 fn create_rx_with_context(
1860 &self,
1861 debug_context: Option<ChannelDebugContext>,
1862 ) -> (ChannelId, vox_types::BoundChannelReceiver) {
1863 self.create_rx_bound(debug_context)
1864 }
1865
1866 fn bind_tx(&self, channel_id: ChannelId) -> Arc<dyn ChannelSink> {
1867 self.bind_tx_dyn(channel_id, None)
1868 }
1869
1870 fn bind_tx_with_context(
1871 &self,
1872 channel_id: ChannelId,
1873 debug_context: Option<ChannelDebugContext>,
1874 ) -> Arc<dyn ChannelSink> {
1875 self.bind_tx_dyn(channel_id, debug_context)
1876 }
1877
1878 fn register_rx(&self, channel_id: ChannelId) -> vox_types::BoundChannelReceiver {
1879 self.register_rx_bound(channel_id, None)
1880 }
1881
1882 fn register_rx_with_context(
1883 &self,
1884 channel_id: ChannelId,
1885 debug_context: Option<ChannelDebugContext>,
1886 ) -> vox_types::BoundChannelReceiver {
1887 self.register_rx_bound(channel_id, debug_context)
1888 }
1889
1890 fn channel_liveness(&self) -> Option<ChannelLivenessHandle> {
1891 self.endpoint_liveness()
1892 }
1893}
1894
1895#[derive(Clone)]
1899pub struct DriverCaller {
1900 sender: ConnectionSender,
1901 shared: Arc<DriverShared>,
1902 local_control_tx: mpsc::UnboundedSender<DriverLocalControl>,
1903 closed_rx: watch::Receiver<Option<ConnectionCloseReason>>,
1904 resumed_rx: watch::Receiver<u64>,
1905 resume_processed_rx: watch::Receiver<u64>,
1906 peer_supports_retry: bool,
1907 _drop_guard: Option<Arc<CallerDropGuard>>,
1908}
1909
1910impl DriverCaller {
1911 pub fn create_tx_channel(&self) -> (ChannelId, Arc<CreditSink<DriverChannelSink>>) {
1916 self.create_tx_credit_sink(None)
1917 }
1918
1919 #[cfg(test)]
1924 pub(crate) fn connection_sender(&self) -> &ConnectionSender {
1925 &self.sender
1926 }
1927
1928 pub fn register_rx_channel(&self, channel_id: ChannelId) -> vox_types::BoundChannelReceiver {
1933 self.register_rx_bound(channel_id, None)
1934 }
1935}
1936
1937impl DriverChannelEndpoint for DriverCaller {
1938 fn endpoint_sender(&self) -> &ConnectionSender {
1939 &self.sender
1940 }
1941
1942 fn endpoint_shared(&self) -> &Arc<DriverShared> {
1943 &self.shared
1944 }
1945
1946 fn endpoint_local_control_tx(&self) -> &mpsc::UnboundedSender<DriverLocalControl> {
1947 &self.local_control_tx
1948 }
1949
1950 fn endpoint_liveness(&self) -> Option<ChannelLivenessHandle> {
1951 self._drop_guard
1952 .as_ref()
1953 .map(|guard| guard.clone() as ChannelLivenessHandle)
1954 }
1955}
1956
1957impl ChannelBinder for DriverCaller {
1958 fn create_tx(&self) -> (ChannelId, Arc<dyn ChannelSink>) {
1959 self.create_tx_dyn(None)
1960 }
1961
1962 fn create_tx_with_context(
1963 &self,
1964 debug_context: Option<ChannelDebugContext>,
1965 ) -> (ChannelId, Arc<dyn ChannelSink>) {
1966 self.create_tx_dyn(debug_context)
1967 }
1968
1969 fn create_rx(&self) -> (ChannelId, vox_types::BoundChannelReceiver) {
1970 self.create_rx_bound(None)
1971 }
1972
1973 fn create_rx_with_context(
1974 &self,
1975 debug_context: Option<ChannelDebugContext>,
1976 ) -> (ChannelId, vox_types::BoundChannelReceiver) {
1977 self.create_rx_bound(debug_context)
1978 }
1979
1980 fn bind_tx(&self, channel_id: ChannelId) -> Arc<dyn ChannelSink> {
1981 self.bind_tx_dyn(channel_id, None)
1982 }
1983
1984 fn bind_tx_with_context(
1985 &self,
1986 channel_id: ChannelId,
1987 debug_context: Option<ChannelDebugContext>,
1988 ) -> Arc<dyn ChannelSink> {
1989 self.bind_tx_dyn(channel_id, debug_context)
1990 }
1991
1992 fn register_rx(&self, channel_id: ChannelId) -> vox_types::BoundChannelReceiver {
1993 self.register_rx_bound(channel_id, None)
1994 }
1995
1996 fn register_rx_with_context(
1997 &self,
1998 channel_id: ChannelId,
1999 debug_context: Option<ChannelDebugContext>,
2000 ) -> vox_types::BoundChannelReceiver {
2001 self.register_rx_bound(channel_id, debug_context)
2002 }
2003
2004 fn channel_liveness(&self) -> Option<ChannelLivenessHandle> {
2005 self.endpoint_liveness()
2006 }
2007}
2008
2009impl DriverCaller {
2010 pub fn debug_snapshot(&self) -> VoxDebugSnapshot {
2012 self.shared.debug_snapshot(
2013 &self.sender,
2014 self.shared
2015 .connection_debug_state(self.closed_rx.borrow().is_some()),
2016 if self.closed_rx.borrow().is_some() {
2017 DriverTaskStatus::Dead
2018 } else {
2019 DriverTaskStatus::Alive
2020 },
2021 )
2022 }
2023
2024 pub fn dump_debug_snapshot(&self) -> VoxDebugSnapshot {
2025 let snapshot = self.debug_snapshot();
2026 tracing::info!(?snapshot, "vox debug snapshot");
2027 snapshot
2028 }
2029
2030 async fn call_inner(
2032 &self,
2033 mut call: RequestCall<'_>,
2034 request_debug: Option<(&'static str, &'static str)>,
2035 ) -> CallResult {
2036 if self.peer_supports_retry {
2037 let operation_id = OperationId(
2038 self.shared
2039 .next_operation_id
2040 .fetch_add(1, Ordering::Relaxed),
2041 );
2042 ensure_operation_id(&mut call.metadata, operation_id);
2043 }
2044
2045 let req_id = self.shared.request_ids.lock().alloc();
2047 let request_started_at = Instant::now();
2048 let (service_name, method_name) = request_debug.unwrap_or(("<unknown>", "<unknown>"));
2049 tracing::debug!(
2050 conn_id = ?self.sender.connection_id(),
2051 ?req_id,
2052 method_id = ?call.method_id,
2053 service = service_name,
2054 method = method_name,
2055 "vox caller starting request"
2056 );
2057 if let Some(observer) = &self.shared.observer {
2058 observer.driver_event(DriverEvent::RequestStarted {
2059 connection_id: self.sender.connection_id(),
2060 request_id: req_id,
2061 method_id: call.method_id,
2062 });
2063 }
2064 let finish_request = |outcome: RpcOutcome| {
2065 self.shared.finish_request(
2066 req_id,
2067 if outcome == RpcOutcome::Ok {
2068 RequestDebugState::Finished
2069 } else {
2070 RequestDebugState::Failed
2071 },
2072 );
2073 if let Some(observer) = &self.shared.observer {
2074 observer.driver_event(DriverEvent::RequestFinished {
2075 connection_id: self.sender.connection_id(),
2076 request_id: req_id,
2077 outcome,
2078 elapsed: request_started_at.elapsed(),
2079 });
2080 }
2081 };
2082
2083 let (tx, rx) = moire::sync::oneshot::channel("driver.response");
2086 self.shared.pending_responses.lock().insert(req_id, tx);
2087 self.shared.start_request(
2088 req_id,
2089 call.method_id,
2090 Some(service_name),
2091 Some(method_name),
2092 RequestDebugState::WaitingForResponse,
2093 );
2094
2095 self.shared.mark_outbound_progress();
2103 tracing::debug!(
2104 conn_id = ?self.sender.connection_id(),
2105 ?req_id,
2106 method_id = ?call.method_id,
2107 service = service_name,
2108 method = method_name,
2109 "vox caller sending request"
2110 );
2111 if self
2112 .sender
2113 .send_with_binder(
2114 ConnectionMessage::Request(RequestMessage {
2115 id: req_id,
2116 body: RequestBody::Call(RequestCall {
2117 method_id: call.method_id,
2118 args: call.args.reborrow(),
2119 metadata: call.metadata.clone(),
2120 schemas: Default::default(),
2121 }),
2122 }),
2123 Some(self),
2124 )
2125 .await
2126 .is_err()
2127 {
2128 tracing::debug!(
2129 conn_id = ?self.sender.connection_id(),
2130 ?req_id,
2131 method_id = ?call.method_id,
2132 service = service_name,
2133 method = method_name,
2134 "vox caller request send failed"
2135 );
2136 self.shared.pending_responses.lock().remove(&req_id);
2137 finish_request(RpcOutcome::SendFailed);
2138 return Err(VoxError::SendFailed);
2139 }
2140 tracing::debug!(
2141 conn_id = ?self.sender.connection_id(),
2142 ?req_id,
2143 method_id = ?call.method_id,
2144 service = service_name,
2145 method = method_name,
2146 "vox caller request sent; waiting for response"
2147 );
2148
2149 let mut resumed_rx = self.resumed_rx.clone();
2150 let mut seen_resume_generation = *resumed_rx.borrow();
2151 let mut resume_processed_rx = self.resume_processed_rx.clone();
2152 let mut closed_rx = self.closed_rx.clone();
2153 let mut response = std::pin::pin!(rx.named("awaiting_response"));
2154
2155 let pending: PendingResponse = loop {
2156 tokio::select! {
2157 result = &mut response => {
2158 match result {
2159 Ok(pending) => {
2160 tracing::debug!(
2161 conn_id = ?self.sender.connection_id(),
2162 ?req_id,
2163 method_id = ?call.method_id,
2164 service = service_name,
2165 method = method_name,
2166 "vox caller received response"
2167 );
2168 break pending;
2169 }
2170 Err(_) => {
2171 finish_request(RpcOutcome::Closed);
2172 return Err(VoxError::ConnectionClosed);
2173 }
2174 }
2175 }
2176 changed = resumed_rx.changed(), if self.peer_supports_retry => {
2177 vox_types::dlog!("[CALLER] resumed_rx fired");
2178 if changed.is_err() {
2179 self.shared.pending_responses.lock().remove(&req_id);
2180 finish_request(RpcOutcome::Closed);
2181 return Err(VoxError::SessionShutdown);
2182 }
2183 let generation = *resumed_rx.borrow();
2184 if generation == seen_resume_generation {
2185 continue;
2186 }
2187 seen_resume_generation = generation;
2188 while *resume_processed_rx.borrow() < generation {
2189 if resume_processed_rx.changed().await.is_err() {
2190 self.shared.pending_responses.lock().remove(&req_id);
2191 finish_request(RpcOutcome::Closed);
2192 return Err(VoxError::SessionShutdown);
2193 }
2194 }
2195 match metadata_channel_retry_mode(&call.metadata) {
2196 ChannelRetryMode::NonIdem => {
2197 self.shared.pending_responses.lock().remove(&req_id);
2198 finish_request(RpcOutcome::Indeterminate);
2199 return Err(VoxError::Indeterminate);
2200 }
2201 ChannelRetryMode::Idem | ChannelRetryMode::None => {}
2202 }
2203 self.shared.mark_outbound_progress();
2207 let _ = self.sender.send_with_binder(
2208 ConnectionMessage::Request(RequestMessage {
2209 id: req_id,
2210 body: RequestBody::Call(RequestCall {
2211 method_id: call.method_id,
2212 args: call.args.reborrow(),
2213 metadata: call.metadata.clone(),
2214 schemas: Default::default(),
2215 }),
2216 }),
2217 Some(self),
2218 ).await;
2219 }
2220 changed = closed_rx.changed() => {
2221 vox_types::dlog!("[CALLER] closed_rx fired, value={:?}", *closed_rx.borrow());
2222 if changed.is_err() || closed_rx.borrow().is_some() {
2223 self.shared.pending_responses.lock().remove(&req_id);
2224 finish_request(RpcOutcome::Closed);
2225 return Err(VoxError::ConnectionClosed);
2226 }
2227 }
2228 }
2229 };
2230
2231 let PendingResponse {
2233 msg: response_msg,
2234 schemas: response_schemas,
2235 fds: response_fds,
2236 } = pending;
2237 let response = response_msg.map(|m| match m.body {
2238 RequestBody::Response(r) => r,
2239 _ => unreachable!("pending_responses only gets Response variants"),
2240 });
2241
2242 finish_request(RpcOutcome::Ok);
2243 Ok(vox_types::WithTracker {
2244 value: response,
2245 tracker: response_schemas,
2246 fds: response_fds,
2247 })
2248 }
2249}
2250
2251pub struct Driver<H: Handler<DriverReplySink>> {
2258 sender: ConnectionSender,
2259 rx: mpsc::Receiver<crate::session::RecvMessage>,
2260 failures_rx: mpsc::UnboundedReceiver<(RequestId, FailureDisposition)>,
2261 closed_rx: watch::Receiver<Option<ConnectionCloseReason>>,
2262 resumed_rx: watch::Receiver<u64>,
2263 resume_processed_tx: watch::Sender<u64>,
2264 peer_supports_retry: bool,
2265 local_control_rx: mpsc::UnboundedReceiver<DriverLocalControl>,
2266 handler: Arc<H>,
2267 shared: Arc<DriverShared>,
2268 in_flight_handlers: BTreeMap<RequestId, InFlightHandler>,
2272 handler_futs: FuturesUnordered<HandlerFut>,
2278 live_operations: Arc<SyncMutex<LiveOperationTracker>>,
2281 local_control_tx: mpsc::UnboundedSender<DriverLocalControl>,
2282 drop_control_seed: Option<mpsc::UnboundedSender<DropControlRequest>>,
2283 drop_control_request: DropControlRequest,
2284 drop_guard: SyncMutex<Option<Weak<CallerDropGuard>>>,
2285}
2286
2287enum DriverLocalControl {
2288 CloseChannel {
2289 channel_id: ChannelId,
2290 },
2291 ResetChannel {
2292 channel_id: ChannelId,
2293 },
2294 GrantCredit {
2295 channel_id: ChannelId,
2296 additional: u32,
2297 },
2298}
2299
2300struct DriverChannelCreditReplenisher {
2301 connection_id: ConnectionId,
2302 channel_id: ChannelId,
2303 debug_context: Option<ChannelDebugContext>,
2304 shared: Weak<DriverShared>,
2305 threshold: u32,
2306 local_control_tx: mpsc::UnboundedSender<DriverLocalControl>,
2307 observer: Option<VoxObserverHandle>,
2308 pending: std::sync::Mutex<u32>,
2309}
2310
2311impl DriverChannelCreditReplenisher {
2312 fn new(
2313 connection_id: ConnectionId,
2314 channel_id: ChannelId,
2315 debug_context: Option<ChannelDebugContext>,
2316 shared: Weak<DriverShared>,
2317 initial_credit: u32,
2318 local_control_tx: mpsc::UnboundedSender<DriverLocalControl>,
2319 observer: Option<VoxObserverHandle>,
2320 ) -> Self {
2321 Self {
2322 connection_id,
2323 channel_id,
2324 debug_context,
2325 shared,
2326 threshold: (initial_credit / 2).max(1),
2327 local_control_tx,
2328 observer,
2329 pending: std::sync::Mutex::new(0),
2330 }
2331 }
2332}
2333
2334impl ChannelCreditReplenisher for DriverChannelCreditReplenisher {
2335 fn on_item_consumed(&self) {
2336 let mut pending = self.pending.lock().expect("pending credit mutex poisoned");
2337 *pending += 1;
2338 if let Some(shared) = self.shared.upgrade() {
2339 shared.record_item_consumed(self.channel_id);
2340 shared.record_pending_local_grant(self.channel_id, *pending);
2341 }
2342 if *pending < self.threshold {
2343 return;
2344 }
2345
2346 let additional = *pending;
2347 *pending = 0;
2348 if let Some(shared) = self.shared.upgrade() {
2349 shared.record_pending_local_grant(self.channel_id, additional);
2350 }
2351 let _ = self.local_control_tx.send(DriverLocalControl::GrantCredit {
2352 channel_id: self.channel_id,
2353 additional,
2354 });
2355 }
2356
2357 fn on_receiver_dropped(&self) {
2358 if let Some(shared) = self.shared.upgrade() {
2359 shared.record_receiver_dropped(self.channel_id);
2360 }
2361 let _ = self
2362 .local_control_tx
2363 .send(DriverLocalControl::ResetChannel {
2364 channel_id: self.channel_id,
2365 });
2366 }
2367
2368 fn channel_id(&self) -> Option<ChannelId> {
2369 Some(self.channel_id)
2370 }
2371
2372 fn connection_id(&self) -> Option<ConnectionId> {
2373 Some(self.connection_id)
2374 }
2375
2376 fn debug_context(&self) -> Option<ChannelDebugContext> {
2377 self.debug_context
2378 }
2379
2380 fn observer(&self) -> Option<VoxObserverHandle> {
2381 self.observer.clone()
2382 }
2383}
2384
2385impl<H: Handler<DriverReplySink>> Driver<H> {
2386 fn close_all_channel_runtime_state(&self, teardown: ChannelRuntimeTeardown) {
2388 let mut credits = self.shared.channel_credits.lock();
2389 for semaphore in credits.values() {
2390 semaphore.close();
2391 }
2392 let mut stale = self.shared.stale_close_channels.lock();
2395 stale.extend(credits.keys().copied());
2396 credits.clear();
2397 drop(credits);
2398
2399 let channel_senders = {
2400 let mut senders = self.shared.channel_senders.lock();
2401 std::mem::take(&mut *senders)
2402 };
2403 if let ChannelRuntimeTeardown::ConnectionClosed(reason) = teardown {
2404 for (channel_id, sender) in channel_senders {
2405 let _ = sender.force_send(IncomingChannelMessage::ConnectionClosed(reason));
2406 self.shared
2407 .observe_channel(channel_id, None, |channel| ChannelEvent::Closed {
2408 channel,
2409 reason: ChannelCloseReason::ConnectionClosed,
2410 });
2411 }
2412 }
2413 self.shared.channel_receivers.lock().clear();
2414 self.shared.terminal_channels.lock().clear();
2415 }
2416
2417 fn close_outbound_channel(&self, channel_id: ChannelId) {
2418 self.shared.terminal_channels.lock().insert(channel_id);
2419 if let Some(semaphore) = self.shared.channel_credits.lock().remove(&channel_id) {
2420 semaphore.close();
2421 }
2422 }
2423
2424 fn abort_channel_handlers(&mut self) {
2425 for in_flight in self.in_flight_handlers.values() {
2426 if self.handler.args_have_channels(in_flight.method_id) {
2427 if let Some(operation_id) = in_flight.operation_id {
2428 self.shared.operations.remove(operation_id);
2429 self.live_operations.lock().release(operation_id);
2430 }
2431 in_flight.abort.abort();
2432 }
2433 }
2434 }
2435
2436 pub fn new(handle: ConnectionHandle, handler: H) -> Self {
2437 Self::with_operation_store(handle, handler, Arc::new(InMemoryOperationStore::default()))
2438 }
2439
2440 pub fn with_operation_store(
2441 handle: ConnectionHandle,
2442 handler: H,
2443 operation_store: Arc<dyn OperationStore>,
2444 ) -> Self {
2445 let conn_id = handle.connection_id();
2446 let ConnectionHandle {
2447 sender,
2448 rx,
2449 failures_rx,
2450 control_tx,
2451 closed_rx,
2452 resumed_rx,
2453 local_settings,
2454 peer_settings,
2455 parity,
2456 peer_supports_retry,
2457 observer,
2458 } = handle;
2459 let drop_control_request = DropControlRequest::Close(conn_id);
2460 let (local_control_tx, local_control_rx) = mpsc::unbounded_channel("driver.local_control");
2461 let (resume_processed_tx, _resume_processed_rx) = watch::channel(0_u64);
2462 Self {
2463 sender,
2464 rx,
2465 failures_rx,
2466 closed_rx,
2467 resumed_rx,
2468 resume_processed_tx,
2469 peer_supports_retry,
2470 local_control_rx,
2471 handler: Arc::new(handler),
2472 shared: Arc::new(DriverShared {
2473 connection_id: conn_id,
2474 pending_responses: SyncMutex::new("driver.pending_responses", BTreeMap::new()),
2475 request_ids: SyncMutex::new("driver.request_ids", IdAllocator::new(parity)),
2476 next_operation_id: AtomicU64::new(1),
2477 operations: operation_store,
2478 channel_ids: SyncMutex::new("driver.channel_ids", IdAllocator::new(parity)),
2479 channel_senders: SyncMutex::new("driver.channel_senders", BTreeMap::new()),
2480 channel_receivers: SyncMutex::new("driver.channel_receivers", BTreeMap::new()),
2481 channel_credits: SyncMutex::new("driver.channel_credits", BTreeMap::new()),
2482 channel_contexts: SyncMutex::new("driver.channel_contexts", BTreeMap::new()),
2483 request_debug: SyncMutex::new("driver.request_debug", BTreeMap::new()),
2484 channel_debug: SyncMutex::new("driver.channel_debug", BTreeMap::new()),
2485 last_inbound_message_at: SyncMutex::new("driver.last_inbound_message_at", None),
2486 last_outbound_message_at: SyncMutex::new("driver.last_outbound_message_at", None),
2487 close_reason: SyncMutex::new("driver.close_reason", None),
2488 terminal_channels: SyncMutex::new("driver.terminal_channels", HashSet::new()),
2489 stale_close_channels: SyncMutex::new(
2490 "driver.stale_close_channels",
2491 std::collections::HashSet::new(),
2492 ),
2493 local_initial_channel_credit: local_settings.initial_channel_credit,
2494 peer_initial_channel_credit: peer_settings.initial_channel_credit,
2495 observer,
2496 }),
2497 in_flight_handlers: BTreeMap::new(),
2498 handler_futs: FuturesUnordered::new(),
2499 live_operations: Arc::new(SyncMutex::new(
2500 "driver.live_operations",
2501 LiveOperationTracker::new(),
2502 )),
2503 local_control_tx,
2504 drop_control_seed: control_tx,
2505 drop_control_request,
2506 drop_guard: SyncMutex::new("driver.drop_guard", None),
2507 }
2508 }
2509
2510 fn existing_drop_guard(&self) -> Option<Arc<CallerDropGuard>> {
2516 self.drop_guard.lock().as_ref().and_then(Weak::upgrade)
2517 }
2518
2519 fn connection_drop_guard(&self) -> Option<Arc<CallerDropGuard>> {
2520 if let Some(existing) = self.existing_drop_guard() {
2521 Some(existing)
2522 } else if let Some(seed) = &self.drop_control_seed {
2523 let mut guard = self.drop_guard.lock();
2524 if let Some(existing) = guard.as_ref().and_then(Weak::upgrade) {
2525 Some(existing)
2526 } else {
2527 let arc = Arc::new(CallerDropGuard {
2528 control_tx: seed.clone(),
2529 request: self.drop_control_request,
2530 });
2531 *guard = Some(Arc::downgrade(&arc));
2532 Some(arc)
2533 }
2534 } else {
2535 None
2536 }
2537 }
2538
2539 pub fn caller(&self) -> DriverCaller {
2540 let drop_guard = self.connection_drop_guard();
2541 DriverCaller {
2542 sender: self.sender.clone(),
2543 shared: Arc::clone(&self.shared),
2544 local_control_tx: self.local_control_tx.clone(),
2545 closed_rx: self.closed_rx.clone(),
2546 resumed_rx: self.resumed_rx.clone(),
2547 resume_processed_rx: self.resume_processed_tx.subscribe(),
2548 peer_supports_retry: self.peer_supports_retry,
2549 _drop_guard: drop_guard,
2550 }
2551 }
2552
2553 pub fn debug_snapshot(&self) -> VoxDebugSnapshot {
2555 self.shared.debug_snapshot(
2556 &self.sender,
2557 self.shared
2558 .connection_debug_state(self.closed_rx.borrow().is_some()),
2559 DriverTaskStatus::Alive,
2560 )
2561 }
2562
2563 pub fn dump_debug_snapshot(&self) -> VoxDebugSnapshot {
2564 let snapshot = self.debug_snapshot();
2565 tracing::info!(?snapshot, "vox debug snapshot");
2566 snapshot
2567 }
2568
2569 fn internal_binder(&self) -> DriverChannelBinder {
2570 DriverChannelBinder {
2571 sender: self.sender.clone(),
2572 shared: Arc::clone(&self.shared),
2573 local_control_tx: self.local_control_tx.clone(),
2574 drop_guard: self.existing_drop_guard(),
2575 }
2576 }
2577
2578 pub async fn run(&mut self) {
2583 let mut resumed_rx = self.resumed_rx.clone();
2584 let mut seen_resume_generation = *resumed_rx.borrow();
2585 loop {
2586 tracing::trace!("driver select loop top");
2587 tokio::select! {
2588 biased;
2589 changed = resumed_rx.changed() => {
2590 if changed.is_err() {
2591 tracing::trace!(
2592 conn_id = self.sender.connection_id().0,
2593 "resume notifier closed, exiting driver"
2594 );
2595 break;
2596 }
2597 let generation = *resumed_rx.borrow();
2598 if generation != seen_resume_generation {
2599 seen_resume_generation = generation;
2600 self.close_all_channel_runtime_state(ChannelRuntimeTeardown::DropOnly);
2601 self.abort_channel_handlers();
2602 let _ = self.resume_processed_tx.send(generation);
2603 }
2604 }
2605 Some(ctrl) = self.local_control_rx.recv() => {
2606 self.handle_local_control(ctrl).await;
2607 }
2608 Some((req_id, disposition)) = self.failures_rx.recv() => {
2609 tracing::trace!(%req_id, ?disposition, "failures_rx fired");
2610 let in_flight_found = self.in_flight_handlers.contains_key(&req_id);
2611 let in_flight_method_id =
2612 self.in_flight_handlers.get(&req_id).map(|in_flight| in_flight.method_id);
2613 let reply_disposition = self
2614 .in_flight_handlers
2615 .get(&req_id)
2616 .map(|in_flight| {
2617 let has_channels =
2618 self.handler.args_have_channels(in_flight.method_id);
2619 if has_channels && !in_flight.retry.idem {
2620 Some(FailureDisposition::Indeterminate)
2621 } else if has_channels && in_flight.retry.idem {
2622 None
2623 } else {
2624 Some(disposition)
2625 }
2626 })
2627 .unwrap_or(Some(disposition));
2628 tracing::trace!(%req_id, in_flight_found, ?reply_disposition, "failures_rx computed disposition");
2629 self.in_flight_handlers.remove(&req_id);
2631 self.shared.finish_request(req_id, RequestDebugState::Failed);
2632 tracing::trace!(%req_id, in_flight = self.in_flight_handlers.len(), "handler removed on failure");
2633 let had_pending = self.shared.pending_responses.lock().remove(&req_id).is_some();
2634 tracing::trace!(%req_id, had_pending, "failures_rx checked pending_responses");
2635 if !had_pending {
2636 let Some(reply_disposition) = reply_disposition else {
2637 tracing::trace!(%req_id, "failures_rx: no reply_disposition, skipping");
2638 continue;
2639 };
2640 tracing::trace!(%req_id, ?reply_disposition, "failures_rx: sending error response");
2641 let vox_error = match reply_disposition {
2642 FailureDisposition::Cancelled => VoxError::Cancelled,
2643 FailureDisposition::Indeterminate => VoxError::Indeterminate,
2644 };
2645 if let Some(method_id) = in_flight_method_id
2646 && let Some(response_shape) = self.handler.response_wire_shape(method_id)
2647 && let Ok(extracted) = vox_types::extract_schemas(response_shape)
2648 {
2649 let registry = vox_types::build_registry(&extracted.schemas);
2650 let error: Result<(), VoxError<core::convert::Infallible>> =
2651 Err(vox_error);
2652 let encoded = vox_postcard::to_vec(&error)
2653 .expect("serialize runtime-generated error response");
2654 let mut response = RequestResponse {
2655 ret: Payload::PostcardBytes(Box::leak(encoded.into_boxed_slice())),
2656 metadata: Default::default(),
2657 schemas: Default::default(),
2658 };
2659 self.sender.prepare_response_from_source(
2660 req_id,
2661 method_id,
2662 &extracted.root,
2663 ®istry,
2664 &mut response,
2665 );
2666 let _ = self.sender.send_response(req_id, response).await;
2667 } else {
2668 let error: Result<(), VoxError<core::convert::Infallible>> =
2669 Err(vox_error);
2670 let _ = self.sender.send_response(req_id, RequestResponse {
2671 ret: Payload::outgoing(&error),
2672 metadata: Default::default(),
2673 schemas: Default::default(),
2674 }).await;
2675 }
2676 tracing::trace!(%req_id, "failures_rx: error response sent");
2677 }
2678 }
2679 recv = self.rx.recv() => {
2680 match recv {
2681 Some(recv) => {
2682 self.handle_recv(recv).await;
2683 }
2684 None => {
2685 tracing::trace!("driver rx closed, exiting loop");
2686 break;
2687 }
2688 }
2689 }
2690 Some(item) = self.handler_futs.next(), if !self.handler_futs.is_empty() => {
2696 match item {
2697 Ok(HandlerCompletion::Finished(req_id)) => {
2698 let removed = self.in_flight_handlers.remove(&req_id).is_some();
2699 self.shared.finish_request(req_id, RequestDebugState::Finished);
2700 tracing::trace!(
2701 %req_id,
2702 removed,
2703 in_flight = self.in_flight_handlers.len(),
2704 "handler completion processed",
2705 );
2706 }
2707 Ok(HandlerCompletion::Panicked { request_id, method_id }) => {
2708 tracing::error!(
2709 req_id = ?request_id,
2710 ?method_id,
2711 "vox driver handler panicked; waiting for reply-sink failure path"
2712 );
2713 }
2714 Err(_aborted) => {
2715 }
2718 }
2719 }
2720 }
2721 }
2722
2723 for (_, in_flight) in std::mem::take(&mut self.in_flight_handlers) {
2724 if !in_flight.retry.persist {
2725 in_flight.abort.abort();
2726 }
2727 }
2728 self.shared.pending_responses.lock().clear();
2729 self.shared.request_debug.lock().clear();
2730 let close_reason =
2731 (*self.closed_rx.borrow()).unwrap_or(ConnectionCloseReason::SessionShutdown);
2732 self.shared.set_connection_closed(close_reason);
2733
2734 self.close_all_channel_runtime_state(ChannelRuntimeTeardown::ConnectionClosed(
2738 close_reason,
2739 ));
2740 }
2741
2742 async fn handle_local_control(&mut self, control: DriverLocalControl) {
2743 match control {
2744 DriverLocalControl::CloseChannel { channel_id } => {
2745 if self.shared.stale_close_channels.lock().remove(&channel_id) {
2750 tracing::trace!(%channel_id, "suppressing ChannelClose for stale channel");
2751 return;
2752 }
2753 self.close_outbound_channel(channel_id);
2754 self.shared
2755 .observe_channel(channel_id, None, |channel| ChannelEvent::Closed {
2756 channel,
2757 reason: ChannelCloseReason::Local,
2758 });
2759 self.shared.mark_outbound_progress();
2760 let _ = self
2761 .sender
2762 .send(ConnectionMessage::Channel(ChannelMessage {
2763 id: channel_id,
2764 body: ChannelBody::Close(ChannelClose {
2765 metadata: Default::default(),
2766 }),
2767 }))
2768 .await;
2769 }
2770 DriverLocalControl::ResetChannel { channel_id } => {
2771 self.shared.channel_senders.lock().remove(&channel_id);
2772 self.shared.channel_receivers.lock().remove(&channel_id);
2773 self.close_outbound_channel(channel_id);
2774 self.shared
2775 .observe_channel(channel_id, None, |channel| ChannelEvent::Reset {
2776 channel,
2777 reason: ChannelResetReason::Local,
2778 });
2779 self.shared.mark_outbound_progress();
2780 let _ = self
2781 .sender
2782 .send(ConnectionMessage::Channel(ChannelMessage {
2783 id: channel_id,
2784 body: ChannelBody::Reset(vox_types::ChannelReset {
2785 metadata: Default::default(),
2786 }),
2787 }))
2788 .await;
2789 }
2790 DriverLocalControl::GrantCredit {
2791 channel_id,
2792 additional,
2793 } => {
2794 self.shared.observe_channel(channel_id, None, |channel| {
2795 ChannelEvent::CreditGranted {
2796 channel,
2797 amount: additional,
2798 }
2799 });
2800 self.shared.mark_outbound_progress();
2801 let _ = self
2802 .sender
2803 .send(ConnectionMessage::Channel(ChannelMessage {
2804 id: channel_id,
2805 body: ChannelBody::GrantCredit(vox_types::ChannelGrantCredit {
2806 additional,
2807 }),
2808 }))
2809 .await;
2810 }
2811 }
2812 }
2813
2814 async fn handle_recv(&mut self, recv: crate::session::RecvMessage) {
2815 self.shared.mark_inbound_progress();
2816 let crate::session::RecvMessage { schemas, msg, fds } = recv;
2817 let msg_ref = msg.get();
2818 let is_request = matches!(msg_ref, ConnectionMessage::Request(_));
2819 if is_request {
2820 if let ConnectionMessage::Request(req) = msg_ref {
2821 vox_types::dlog!(
2822 "[driver] handle_recv request: conn={:?} req={:?} body={} method={:?}",
2823 self.sender.connection_id(),
2824 req.id,
2825 match &req.body {
2826 RequestBody::Call(_) => "Call",
2827 RequestBody::Response(_) => "Response",
2828 RequestBody::Cancel(_) => "Cancel",
2829 },
2830 match &req.body {
2831 RequestBody::Call(call) => Some(call.method_id),
2832 RequestBody::Response(_) | RequestBody::Cancel(_) => None,
2833 }
2834 );
2835 match &req.body {
2836 RequestBody::Call(call) => tracing::trace!(
2837 conn_id = self.sender.connection_id().0,
2838 req_id = req.id.0,
2839 method_id = call.method_id.0,
2840 "driver received call"
2841 ),
2842 RequestBody::Response(_) => tracing::trace!(
2843 conn_id = self.sender.connection_id().0,
2844 req_id = req.id.0,
2845 "driver received response message"
2846 ),
2847 RequestBody::Cancel(_) => tracing::trace!(
2848 conn_id = self.sender.connection_id().0,
2849 req_id = req.id.0,
2850 "driver received cancel message"
2851 ),
2852 }
2853 }
2854 let msg = msg.map(|m| match m {
2855 ConnectionMessage::Request(r) => r,
2856 _ => unreachable!(),
2857 });
2858 self.handle_request(msg, schemas, fds);
2859 } else {
2860 let msg = msg.map(|m| match m {
2861 ConnectionMessage::Channel(c) => c,
2862 _ => unreachable!(),
2863 });
2864 self.handle_channel(msg).await;
2865 }
2866 }
2867
2868 fn handle_request(
2869 &mut self,
2870 msg: SelfRef<RequestMessage<'static>>,
2871 schemas: Arc<vox_types::SchemaRecvTracker>,
2872 fds: vox_types::FrameFds,
2873 ) {
2874 let msg_ref = msg.get();
2875 let req_id = msg_ref.id;
2876 let is_call = matches!(&msg_ref.body, RequestBody::Call(_));
2877 let is_response = matches!(&msg_ref.body, RequestBody::Response(_));
2878 let is_cancel = matches!(&msg_ref.body, RequestBody::Cancel(_));
2879
2880 if is_call {
2881 let method_id = match &msg_ref.body {
2882 RequestBody::Call(call) => call.method_id,
2883 _ => unreachable!(),
2884 };
2885 vox_types::dlog!(
2886 "[driver] inbound call: conn={:?} req={:?} method={:?}",
2887 self.sender.connection_id(),
2888 req_id,
2889 method_id
2890 );
2891 let call = msg.map(|m| match m.body {
2894 RequestBody::Call(c) => c,
2895 _ => unreachable!(),
2896 });
2897 let call_ref = call.get();
2898 let handler = Arc::clone(&self.handler);
2899 let retry = handler.retry_policy(call_ref.method_id);
2900 let operation_id = metadata_operation_id(&call_ref.metadata).filter(|_| !retry.idem);
2902 let method_id = call_ref.method_id;
2903
2904 if let Some(operation_id) = operation_id {
2905 let admit = self.live_operations.lock().admit(
2907 operation_id,
2908 call_ref.method_id,
2909 incoming_args_bytes(call_ref),
2910 retry,
2911 req_id,
2912 );
2913 match admit {
2914 AdmitResult::Attached => return,
2915 AdmitResult::Conflict => {
2916 let sender = self.sender.clone();
2917 moire::task::spawn(
2918 async move {
2919 let error: Result<(), VoxError<core::convert::Infallible>> =
2920 Err(VoxError::InvalidPayload("operation ID conflict".into()));
2921 let _ = sender
2922 .send_response(
2923 req_id,
2924 RequestResponse {
2925 ret: Payload::outgoing(&error),
2926 metadata: Default::default(),
2927 schemas: Default::default(),
2928 },
2929 )
2930 .await;
2931 }
2932 .named("operation_reject"),
2933 );
2934 return;
2935 }
2936 AdmitResult::Start => {}
2937 }
2938
2939 match self.shared.operations.lookup(operation_id) {
2941 crate::OperationState::Sealed => {
2942 if let Some(sealed) = self.shared.operations.get_sealed(operation_id) {
2944 let sender = self.sender.clone();
2945 let method_id = call_ref.method_id;
2946 let response_shape = self.handler.response_wire_shape(method_id);
2947 self.live_operations.lock().seal(operation_id);
2949 moire::task::spawn(
2950 async move {
2951 if replay_sealed_response(
2952 sender.clone(),
2953 req_id,
2954 method_id,
2955 sealed.response.as_bytes(),
2956 response_shape,
2957 )
2958 .await
2959 .is_err()
2960 {
2961 sender.mark_failure(req_id, FailureDisposition::Cancelled);
2962 }
2963 }
2964 .named("operation_replay"),
2965 );
2966 return;
2967 }
2968 }
2969 crate::OperationState::Admitted => {
2970 self.live_operations.lock().seal(operation_id);
2972 let sender = self.sender.clone();
2973 moire::task::spawn(
2974 async move {
2975 let error: Result<(), VoxError<core::convert::Infallible>> =
2976 Err(VoxError::Indeterminate);
2977 let _ = sender
2978 .send_response(
2979 req_id,
2980 RequestResponse {
2981 ret: Payload::outgoing(&error),
2982 metadata: Default::default(),
2983 schemas: Default::default(),
2984 },
2985 )
2986 .await;
2987 }
2988 .named("operation_indeterminate"),
2989 );
2990 return;
2991 }
2992 crate::OperationState::Unknown => {
2993 if !retry.idem {
2996 self.shared.operations.admit(operation_id);
2997 }
2998 }
2999 }
3000 }
3001 let reply = DriverReplySink {
3002 sender: Some(self.sender.clone()),
3003 request_id: req_id,
3004 method_id: call_ref.method_id,
3005 retry,
3006 operation_id,
3007 operations: operation_id.map(|_| Arc::clone(&self.shared.operations)),
3008 live_operations: operation_id.map(|_| Arc::clone(&self.live_operations)),
3009 binder: self.internal_binder(),
3010 handler_response_shape: handler.response_wire_shape(call_ref.method_id),
3011 };
3012 self.shared.start_request(
3013 req_id,
3014 method_id,
3015 None,
3016 None,
3017 RequestDebugState::Dispatching,
3018 );
3019 let (abort, abort_reg) = AbortHandle::new_pair();
3020 let handler_fut: Pin<Box<dyn MaybeSendFuture<Output = HandlerCompletion> + 'static>> =
3021 Box::pin(async move {
3022 tracing::debug!(
3023 req_id = ?req_id,
3024 method_id = ?method_id,
3025 "vox driver handler starting"
3026 );
3027 vox_types::dlog!(
3028 "[driver] handler start: req={:?} method={:?}",
3029 req_id,
3030 method_id
3031 );
3032 let result = AssertUnwindSafe(handler.handle(call, reply, schemas))
3033 .catch_unwind()
3034 .await;
3035 if result.is_err() {
3036 return HandlerCompletion::Panicked {
3037 request_id: req_id,
3038 method_id,
3039 };
3040 }
3041 tracing::debug!(
3042 req_id = ?req_id,
3043 method_id = ?method_id,
3044 "vox driver handler finished"
3045 );
3046 vox_types::dlog!(
3047 "[driver] handler done: req={:?} method={:?}",
3048 req_id,
3049 method_id
3050 );
3051 HandlerCompletion::Finished(req_id)
3052 });
3053 self.handler_futs
3054 .push(Abortable::new(handler_fut, abort_reg));
3055 self.in_flight_handlers.insert(
3056 req_id,
3057 InFlightHandler {
3058 abort,
3059 method_id,
3060 retry,
3061 operation_id,
3062 },
3063 );
3064 tracing::trace!(%req_id, in_flight = self.in_flight_handlers.len(), "handler inserted");
3065 } else if is_response {
3066 vox_types::dlog!(
3068 "[driver] inbound response: conn={:?} req={:?}",
3069 self.sender.connection_id(),
3070 req_id
3071 );
3072 tracing::trace!(%req_id, "driver received response");
3073 if let Some(tx) = self.shared.pending_responses.lock().remove(&req_id) {
3074 vox_types::dlog!("[driver] routing response to waiter: req={:?}", req_id);
3075 tracing::trace!(%req_id, "routing response to pending oneshot");
3076 let _: Result<(), _> = tx.send(PendingResponse { msg, schemas, fds });
3077 } else {
3078 vox_types::dlog!("[driver] dropped unmatched response: req={:?}", req_id);
3079 tracing::trace!(%req_id, "no pending response slot for this req_id");
3080 }
3081 } else if is_cancel {
3082 vox_types::dlog!(
3083 "[driver] inbound cancel: conn={:?} req={:?}",
3084 self.sender.connection_id(),
3085 req_id
3086 );
3087 tracing::trace!(%req_id, in_flight = self.in_flight_handlers.contains_key(&req_id), "received cancel");
3090 match self.live_operations.lock().cancel(req_id) {
3091 CancelResult::NotFound => {
3092 let should_abort = self
3093 .in_flight_handlers
3094 .get(&req_id)
3095 .map(|in_flight| !in_flight.retry.persist)
3096 .unwrap_or(false);
3097 tracing::trace!(%req_id, should_abort, "cancel: not in live operations");
3098 if should_abort && let Some(in_flight) = self.in_flight_handlers.remove(&req_id)
3099 {
3100 tracing::trace!(%req_id, "aborting handler");
3101 in_flight.abort.abort();
3102 self.shared
3103 .finish_request(req_id, RequestDebugState::Failed);
3104 tracing::trace!(%req_id, in_flight = self.in_flight_handlers.len(), "handler removed on cancel");
3105 }
3106 }
3107 CancelResult::Detached => {}
3108 CancelResult::Abort {
3109 owner_request_id,
3110 waiters,
3111 } => {
3112 if let Some(in_flight) = self.in_flight_handlers.remove(&owner_request_id) {
3113 if let Some(op_id) = in_flight.operation_id {
3114 self.shared.operations.remove(op_id);
3115 }
3116 in_flight.abort.abort();
3117 self.shared
3118 .finish_request(owner_request_id, RequestDebugState::Failed);
3119 tracing::trace!(%owner_request_id, in_flight = self.in_flight_handlers.len(), "owner handler removed on abort");
3120 }
3121 for waiter in waiters {
3122 self.sender
3123 .mark_failure(waiter, FailureDisposition::Cancelled);
3124 }
3125 }
3126 }
3127 }
3130 }
3131
3132 async fn handle_channel(&mut self, msg: SelfRef<ChannelMessage<'static>>) {
3133 let msg_ref = msg.get();
3134 let chan_id = msg_ref.id;
3135 enum ChannelBodyKind {
3136 Item,
3137 Close,
3138 Reset,
3139 GrantCredit(u32),
3140 }
3141 let body_kind = match &msg_ref.body {
3142 ChannelBody::Item(_) => ChannelBodyKind::Item,
3143 ChannelBody::Close(_) => ChannelBodyKind::Close,
3144 ChannelBody::Reset(_) => ChannelBodyKind::Reset,
3145 ChannelBody::GrantCredit(grant) => ChannelBodyKind::GrantCredit(grant.additional),
3146 };
3147
3148 match body_kind {
3149 ChannelBodyKind::Item => {
3152 if self.shared.terminal_channels.lock().contains(&chan_id) {
3153 self.shared.record_inbound_item_not_enqueued(chan_id);
3154 tracing::trace!(
3155 conn_id = self.sender.connection_id().0,
3156 channel_id = chan_id.0,
3157 "driver dropped item for terminal channel"
3158 );
3159 return;
3160 }
3161
3162 tracing::trace!(
3163 conn_id = self.sender.connection_id().0,
3164 channel_id = chan_id.0,
3165 "driver received channel item"
3166 );
3167 let item = msg.map(|m| match m.body {
3168 ChannelBody::Item(item) => item,
3169 _ => unreachable!(),
3170 });
3171 let sender = self.shared.inbound_channel_sender(chan_id);
3172 if sender
3173 .send(IncomingChannelMessage::Item(item))
3174 .await
3175 .is_err()
3176 {
3177 self.shared.record_inbound_item_not_enqueued(chan_id);
3178 self.shared.channel_senders.lock().remove(&chan_id);
3179 self.shared.channel_receivers.lock().remove(&chan_id);
3180 self.close_outbound_channel(chan_id);
3181 let _ = self
3182 .local_control_tx
3183 .send(DriverLocalControl::ResetChannel {
3184 channel_id: chan_id,
3185 });
3186 return;
3187 }
3188 self.shared
3189 .observe_channel(chan_id, None, |channel| ChannelEvent::ItemReceived {
3190 channel,
3191 });
3192 }
3193 ChannelBodyKind::Close => {
3195 if self.shared.terminal_channels.lock().contains(&chan_id) {
3196 return;
3197 }
3198 let sender = self.shared.inbound_channel_sender(chan_id);
3199 tracing::trace!(
3200 conn_id = self.sender.connection_id().0,
3201 channel_id = chan_id.0,
3202 "driver received channel close"
3203 );
3204 let close = msg.map(|m| match m.body {
3205 ChannelBody::Close(close) => close,
3206 _ => unreachable!(),
3207 });
3208 let delivered = sender
3209 .send(IncomingChannelMessage::Close(close))
3210 .await
3211 .is_ok();
3212 self.shared.channel_senders.lock().remove(&chan_id);
3213 self.shared.terminal_channels.lock().insert(chan_id);
3214 self.close_outbound_channel(chan_id);
3215 if !delivered {
3216 self.shared.channel_receivers.lock().remove(&chan_id);
3217 return;
3218 }
3219 self.shared
3220 .observe_channel(chan_id, None, |channel| ChannelEvent::Closed {
3221 channel,
3222 reason: ChannelCloseReason::Remote,
3223 });
3224 }
3225 ChannelBodyKind::Reset => {
3227 if self.shared.terminal_channels.lock().contains(&chan_id) {
3228 return;
3229 }
3230 let sender = self.shared.inbound_channel_sender(chan_id);
3231 tracing::trace!(
3232 conn_id = self.sender.connection_id().0,
3233 channel_id = chan_id.0,
3234 "driver received channel reset"
3235 );
3236 let reset = msg.map(|m| match m.body {
3237 ChannelBody::Reset(reset) => reset,
3238 _ => unreachable!(),
3239 });
3240 let delivered = sender
3241 .send(IncomingChannelMessage::Reset(reset))
3242 .await
3243 .is_ok();
3244 self.shared.channel_senders.lock().remove(&chan_id);
3245 self.shared.terminal_channels.lock().insert(chan_id);
3246 self.close_outbound_channel(chan_id);
3247 if !delivered {
3248 self.shared.channel_receivers.lock().remove(&chan_id);
3249 return;
3250 }
3251 self.shared
3252 .observe_channel(chan_id, None, |channel| ChannelEvent::Reset {
3253 channel,
3254 reason: ChannelResetReason::Remote,
3255 });
3256 }
3257 ChannelBodyKind::GrantCredit(additional) => {
3260 self.shared.record_credit_received(chan_id, additional);
3261 self.shared.emit_channel_event(chan_id, None, |channel| {
3262 ChannelEvent::CreditGranted {
3263 channel,
3264 amount: additional,
3265 }
3266 });
3267 tracing::trace!(
3268 conn_id = self.sender.connection_id().0,
3269 channel_id = chan_id.0,
3270 additional,
3271 "driver received channel credit"
3272 );
3273 if let Some(semaphore) = self.shared.channel_credits.lock().get(&chan_id) {
3274 semaphore.add_permits(additional as usize);
3275 }
3276 }
3277 }
3278 }
3279}