1#![allow(clippy::type_complexity)]
2
3use log::{debug, error, warn};
4
5use crate::{
6 check_concern,
7 context::{Context, RootContext},
8 downcast_box::DowncastBox,
9 grpc_call::GrpcCallResponse,
10 grpc_stream::{GrpcStreamClose, GrpcStreamHandle, GrpcStreamMessage},
11 hostcalls::{self, BufferType},
12 http::{
13 HttpContext, RequestBody, RequestHeaders, RequestTrailers, ResponseBody, ResponseHeaders,
14 ResponseTrailers,
15 },
16 http_call::HttpCallResponse,
17 property::envoy::Attributes,
18 queue::Queue,
19 stream::{DownstreamData, StreamClose, StreamContext, UpstreamData},
20 CloseType, FilterDataStatus, FilterHeadersStatus, FilterStreamStatus, FilterTrailersStatus,
21 GrpcCode,
22};
23use std::{
24 cell::{Cell, RefCell, RefMut},
25 collections::{hash_map::Entry, HashMap},
26 sync::{
27 atomic::{AtomicUsize, Ordering},
28 Mutex,
29 },
30};
31
32#[cfg(feature = "stream-metadata")]
33pub use crate::grpc_stream::{GrpcStreamInitialMetadata, GrpcStreamTrailingMetadata};
34
35thread_local! {
36 static DISPATCHER: Dispatcher = Dispatcher::new();
37}
38static DISPATCHER_GEN: AtomicUsize = AtomicUsize::new(0);
39
40pub(crate) fn reset() {
41 DISPATCHER_GEN.fetch_add(1, Ordering::Relaxed);
42 *ROOT_INIT.lock().unwrap() = None;
43}
44
45pub(crate) fn root_id() -> u32 {
46 DISPATCHER.with(|x| x.active_root_id.get())
47}
48
49fn dispatch<F, R>(f: F) -> R
50where
51 F: FnOnce(&Dispatcher) -> R,
52{
53 DISPATCHER.with(|d| {
54 let current_gen = DISPATCHER_GEN.load(Ordering::Relaxed);
55 if d.generation.get() != current_gen {
56 d.generation.set(current_gen);
57 d.reset();
58 }
59 f(d)
60 })
61}
62
63static ROOT_INIT: Mutex<Option<Box<dyn Fn() -> DowncastBox<dyn RootContext> + Send + Sync>>> =
64 Mutex::new(None);
65
66struct HttpCallback {
67 context_id: u32,
68 root_context_id: u32,
69 callback: Box<dyn FnOnce(&mut DowncastBox<dyn RootContext>, &HttpCallResponse)>,
70}
71
72struct GrpcCallback {
73 context_id: u32,
74 root_context_id: u32,
75 callback: Box<dyn FnOnce(&mut DowncastBox<dyn RootContext>, &GrpcCallResponse)>,
76}
77
78#[derive(Default)]
79struct GrpcStreamCallback {
80 context_id: u32,
81 root_context_id: u32,
82 close: Option<Box<dyn FnOnce(&mut DowncastBox<dyn RootContext>, &GrpcStreamClose)>>,
83 message: Option<
84 Box<dyn FnMut(&mut DowncastBox<dyn RootContext>, GrpcStreamHandle, &GrpcStreamMessage)>,
85 >,
86 #[cfg(feature = "stream-metadata")]
87 initial_meta: Option<
88 Box<
89 dyn FnMut(
90 &mut DowncastBox<dyn RootContext>,
91 GrpcStreamHandle,
92 &GrpcStreamInitialMetadata,
93 ),
94 >,
95 >,
96 #[cfg(feature = "stream-metadata")]
97 trailer_meta: Option<
98 Box<
99 dyn FnMut(
100 &mut DowncastBox<dyn RootContext>,
101 GrpcStreamHandle,
102 &GrpcStreamTrailingMetadata,
103 ),
104 >,
105 >,
106}
107
108struct StreamInfo {
109 parent_context_id: u32,
110 data: Box<dyn StreamContext>,
111}
112
113struct HttpStreamInfo {
114 parent_context_id: u32,
115 data: Box<dyn HttpContext>,
116}
117
118struct RootInfo {
119 data: DowncastBox<dyn RootContext>,
120}
121
122#[derive(Default)]
123struct Dispatcher {
124 roots: RefCell<HashMap<u32, RootInfo>>,
125 streams: RefCell<HashMap<u32, StreamInfo>>,
126 http_streams: RefCell<HashMap<u32, HttpStreamInfo>>,
127 http_callbacks: RefCell<HashMap<u32, HttpCallback>>,
128 grpc_callbacks: RefCell<HashMap<u32, GrpcCallback>>,
129 grpc_streams: RefCell<HashMap<u32, GrpcStreamCallback>>,
130 queue_callbacks:
131 RefCell<HashMap<u32, Box<dyn FnMut(&mut DowncastBox<dyn RootContext>, Queue)>>>,
132 active_id: Cell<u32>,
133 active_root_id: Cell<u32>,
134 generation: Cell<usize>,
135}
136
137impl Dispatcher {
138 fn reset(&self) {
139 self.roots.borrow_mut().clear();
140 self.streams.borrow_mut().clear();
141 self.http_streams.borrow_mut().clear();
142 self.http_callbacks.borrow_mut().clear();
143 self.grpc_callbacks.borrow_mut().clear();
144 self.grpc_streams.borrow_mut().clear();
145 self.queue_callbacks.borrow_mut().clear();
146 self.roots.borrow_mut().clear();
147 self.active_id.set(0);
148 self.active_root_id.set(0);
149 }
150
151 fn root<'a>(
152 roots: &'a mut RefMut<'_, HashMap<u32, RootInfo>>,
153 root_context_id: u32,
154 ) -> &'a mut DowncastBox<dyn RootContext> {
155 roots.entry(root_context_id).or_insert_with(|| RootInfo {
156 data: ROOT_INIT
157 .lock()
158 .unwrap()
159 .as_ref()
160 .expect("missing root_context_factory")(),
161 });
162 &mut roots.get_mut(&root_context_id).unwrap().data
163 }
164}
165
166pub fn set_root_context_factory<R: RootContext + 'static>(root: fn() -> R) {
168 *ROOT_INIT.lock().unwrap() = Some(Box::new(move || DowncastBox::new(Box::new(root()))));
169}
170
171pub(crate) fn register_http_callback(
172 token: u32,
173 callback: Box<dyn FnOnce(&mut DowncastBox<dyn RootContext>, &HttpCallResponse)>,
174) {
175 dispatch(|d| {
176 d.http_callbacks.borrow_mut().insert(
177 token,
178 HttpCallback {
179 context_id: d.active_id.get(),
180 root_context_id: d.active_root_id.get(),
181 callback,
182 },
183 )
184 });
185}
186
187pub(crate) fn register_grpc_callback(
188 token: u32,
189 callback: Box<dyn FnOnce(&mut DowncastBox<dyn RootContext>, &GrpcCallResponse)>,
190) {
191 dispatch(|d| {
192 d.grpc_callbacks.borrow_mut().insert(
193 token,
194 GrpcCallback {
195 context_id: d.active_id.get(),
196 root_context_id: d.active_root_id.get(),
197 callback,
198 },
199 )
200 });
201}
202
203#[cfg(feature = "stream-metadata")]
204pub(crate) fn register_grpc_stream_initial_meta(
205 token: u32,
206 callback: Box<
207 dyn FnMut(&mut DowncastBox<dyn RootContext>, GrpcStreamHandle, &GrpcStreamInitialMetadata),
208 >,
209) {
210 dispatch(|d| {
211 let context_id = d.d.active_id.get();
212 let root_context_id = d.active_root_id.get();
213 match d.grpc_streams.borrow_mut().entry(token) {
214 Entry::Occupied(entry) if entry.get().context_id != context_id => {
215 error!(
216 "mismatch in context for register_grpc_stream_initial_meta! {} != {}",
217 entry.get().context_id,
218 context_id
219 );
220 }
221 Entry::Occupied(mut entry) => {
222 entry.get_mut().initial_meta = Some(callback);
223 }
224 Entry::Vacant(entry) => {
225 entry.insert(GrpcStreamCallback {
226 context_id,
227 root_context_id,
228 initial_meta: Some(callback),
229 ..Default::default()
230 });
231 }
232 }
233 });
234}
235
236pub(crate) fn register_grpc_stream_message(
237 token: u32,
238 callback: Box<
239 dyn FnMut(&mut DowncastBox<dyn RootContext>, GrpcStreamHandle, &GrpcStreamMessage),
240 >,
241) {
242 dispatch(|d| {
243 let context_id = d.active_id.get();
244 let root_context_id = d.active_root_id.get();
245 match d.grpc_streams.borrow_mut().entry(token) {
246 Entry::Occupied(entry) if entry.get().context_id != context_id => {
247 error!(
248 "mismatch in context for register_grpc_stream_message! {} != {}",
249 entry.get().context_id,
250 context_id
251 );
252 }
253 Entry::Occupied(mut entry) => {
254 entry.get_mut().message = Some(callback);
255 }
256 Entry::Vacant(entry) => {
257 entry.insert(GrpcStreamCallback {
258 context_id,
259 root_context_id,
260 message: Some(callback),
261 ..Default::default()
262 });
263 }
264 }
265 });
266}
267
268#[cfg(feature = "stream-metadata")]
269pub(crate) fn register_grpc_stream_trailing_metadata(
270 token: u32,
271 callback: Box<
272 dyn FnMut(&mut DowncastBox<dyn RootContext>, GrpcStreamHandle, &GrpcStreamTrailingMetadata),
273 >,
274) {
275 dispatch(|d| {
276 let context_id = d.active_id.get();
277 let root_context_id = d.active_root_id.get();
278 match d.grpc_streams.borrow_mut().entry(token) {
279 Entry::Occupied(entry) if entry.get().context_id != context_id => {
280 error!(
281 "mismatch in context for register_grpc_stream_trailing_metadata! {} != {}",
282 entry.get().context_id,
283 context_id
284 );
285 }
286 Entry::Occupied(mut entry) => {
287 entry.get_mut().trailer_meta = Some(callback);
288 }
289 Entry::Vacant(entry) => {
290 entry.insert(GrpcStreamCallback {
291 context_id,
292 root_context_id,
293 trailer_meta: Some(callback),
294 ..Default::default()
295 });
296 }
297 }
298 });
299}
300
301pub(crate) fn register_grpc_stream_close(
302 token: u32,
303 callback: Box<dyn FnOnce(&mut DowncastBox<dyn RootContext>, &GrpcStreamClose)>,
304) {
305 dispatch(|d| {
306 let context_id = d.active_id.get();
307 let root_context_id = d.active_root_id.get();
308 match d.grpc_streams.borrow_mut().entry(token) {
309 Entry::Occupied(entry) if entry.get().context_id != context_id => {
310 error!(
311 "mismatch in context for register_grpc_stream_close! {} != {}",
312 entry.get().context_id,
313 context_id
314 );
315 }
316 Entry::Occupied(mut entry) => {
317 entry.get_mut().close = Some(callback);
318 }
319 Entry::Vacant(entry) => {
320 entry.insert(GrpcStreamCallback {
321 context_id,
322 root_context_id,
323 close: Some(callback),
324 ..Default::default()
325 });
326 }
327 }
328 })
329}
330
331pub(crate) fn register_queue_callback<R: RootContext + 'static>(
332 token: u32,
333 mut callback: impl FnMut(&mut R, Queue) + 'static,
334) {
335 dispatch(|d| {
336 d.queue_callbacks.borrow_mut().insert(
337 token,
338 Box::new(move |root, queue| {
339 callback(
340 root.as_any_mut().downcast_mut().expect("invalid root type"),
341 queue,
342 )
343 }),
344 );
345 })
346}
347
348struct EffectiveContext {
349 name: &'static str,
350 prior: u32,
351 prior_root: u32,
352}
353
354impl EffectiveContext {
355 pub fn enter(id: u32, root_id: u32, name: &'static str) -> Option<Self> {
356 if let Err(e) = hostcalls::set_effective_context(id) {
357 debug!("failed to assume context {root_id}/{id} for {name}: {e:?}");
358 return None;
359 };
360 let (prior, prior_root) = dispatch(|d| {
361 let prior = d.active_id.get();
362 d.active_id.set(id);
363 let prior_root = d.active_root_id.get();
364 d.active_root_id.set(root_id);
365 (prior, prior_root)
366 });
367 Some(Self {
368 name,
369 prior,
370 prior_root,
371 })
372 }
373}
374
375impl Drop for EffectiveContext {
376 fn drop(&mut self) {
377 if let Err(e) = hostcalls::set_effective_context(self.prior) {
378 debug!("failed to reset context for {}: {e:?}", self.name);
379 };
380 dispatch(|d| {
381 d.active_id.set(self.prior);
382 d.active_root_id.set(self.prior_root);
383 });
384 }
385}
386
387impl Dispatcher {
388 fn new() -> Dispatcher {
389 Self::default()
390 }
391
392 fn do_create_subcontext(&self, root_context_id: u32, context_id: u32) {
393 let mut roots = self.roots.borrow_mut();
394 let root = Self::root(&mut roots, root_context_id);
395 match root.create_context() {
396 Context::Http(context) => {
397 if self
398 .http_streams
399 .borrow_mut()
400 .insert(
401 context_id,
402 HttpStreamInfo {
403 parent_context_id: root_context_id,
404 data: context,
405 },
406 )
407 .is_some()
408 {
409 warn!("reused context_id without proper cleanup");
410 }
411 }
412 Context::Stream(context) => {
413 if self
414 .streams
415 .borrow_mut()
416 .insert(
417 context_id,
418 StreamInfo {
419 parent_context_id: root_context_id,
420 data: context,
421 },
422 )
423 .is_some()
424 {
425 warn!("reused context_id without proper cleanup");
426 }
427 }
428 }
429 }
430
431 fn on_create_context(&self, context_id: u32, parent_context_id: u32) {
432 if parent_context_id == 0 {
433 let mut roots = self.roots.borrow_mut();
434 Self::root(&mut roots, context_id);
435 } else if self.roots.borrow().contains_key(&parent_context_id) {
436 self.do_create_subcontext(parent_context_id, context_id);
437 } else {
438 warn!("attempted to create context {context_id} under unknown context {parent_context_id}");
439 }
440 }
441
442 fn on_done(&self, context_id: u32) -> bool {
443 if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) {
444 self.active_id.set(context_id);
445 self.active_root_id.set(http_stream.parent_context_id);
446 http_stream.data.on_done()
447 } else if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) {
448 self.active_id.set(context_id);
449 self.active_root_id.set(stream.parent_context_id);
450 stream.data.on_done()
451 } else if self.roots.borrow().contains_key(&context_id) {
452 self.active_id.set(context_id);
453 self.active_root_id.set(context_id);
454 let mut roots = self.roots.borrow_mut();
455 Self::root(&mut roots, context_id).on_done()
456 } else {
457 warn!("on_done called on unknown context: {context_id}");
458 true
459 }
460 }
461
462 fn on_log(&self, context_id: u32) {
463 if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) {
464 self.active_id.set(context_id);
465 self.active_root_id.set(http_stream.parent_context_id);
466 http_stream.data.on_log();
467 } else if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) {
468 self.active_id.set(context_id);
469 self.active_root_id.set(stream.parent_context_id);
470 stream.data.on_log();
471 } else if self.roots.borrow().contains_key(&context_id) {
472 self.active_id.set(context_id);
473 self.active_root_id.set(context_id);
474 let mut roots = self.roots.borrow_mut();
475 Self::root(&mut roots, context_id).on_log();
476 } else {
477 warn!("on_log called on unknown context: {context_id}");
478 }
479 }
480
481 fn on_delete(&self, context_id: u32) {
482 if self.http_streams.borrow_mut().remove(&context_id).is_some() {
483 return;
484 }
485 if self.streams.borrow_mut().remove(&context_id).is_some() {
486 return;
487 }
488 if self.roots.borrow_mut().remove(&context_id).is_some() {
489 return;
490 }
491 warn!("deleting unknown context_id {context_id}");
492 }
493
494 fn on_vm_start(&self, context_id: u32, vm_configuration_size: usize) -> bool {
495 if !self.roots.borrow().contains_key(&context_id) {
496 warn!("received on_vm_start for non-root-context: {context_id}");
497 return true;
498 }
499 let Some(configuration) = check_concern(
500 "vm-start-config",
501 hostcalls::get_buffer(BufferType::VmConfiguration, 0, vm_configuration_size),
502 ) else {
503 return false;
504 };
505 self.active_id.set(context_id);
506 self.active_root_id.set(context_id);
507 let mut roots = self.roots.borrow_mut();
508 Self::root(&mut roots, context_id).on_vm_start(configuration)
509 }
510
511 fn on_configure(&self, context_id: u32, plugin_configuration_size: usize) -> bool {
512 if !self.roots.borrow().contains_key(&context_id) {
513 warn!("received on_configure for non-root-context: {context_id}");
514 return true;
515 }
516 let Some(configuration) = check_concern(
517 "configure-fetch",
518 hostcalls::get_buffer(
519 BufferType::PluginConfiguration,
520 0,
521 plugin_configuration_size,
522 ),
523 ) else {
524 return false;
525 };
526 self.active_id.set(context_id);
527 self.active_root_id.set(context_id);
528 let mut roots = self.roots.borrow_mut();
529 Self::root(&mut roots, context_id).on_configure(configuration)
530 }
531
532 fn on_tick(&self, context_id: u32) {
533 if !self.roots.borrow().contains_key(&context_id) {
534 warn!("received on_tick for non-root-context: {context_id}");
535 return;
536 }
537 self.active_id.set(context_id);
538 self.active_root_id.set(context_id);
539 let mut roots = self.roots.borrow_mut();
540 Self::root(&mut roots, context_id).on_tick();
541 }
542
543 fn on_queue_ready(&self, context_id: u32, queue_id: u32) {
544 if !self.roots.borrow().contains_key(&context_id) {
545 warn!("received on_queue_ready for non-root-context: {context_id}");
546 return;
547 }
548 if let Some(callback) = self.queue_callbacks.borrow_mut().get_mut(&queue_id) {
549 let mut roots = self.roots.borrow_mut();
550 callback(
551 &mut roots.get_mut(&context_id).unwrap().data,
552 Queue(queue_id),
553 );
554 }
555 }
556
557 fn on_new_connection(&self, context_id: u32) -> FilterStreamStatus {
558 let mut streams = self.streams.borrow_mut();
559 let stream = if let Some(context) = streams.get_mut(&context_id) {
560 context
561 } else {
562 warn!(
565 "no http context found for context (and was not implicitly created): {context_id}"
566 );
567 return FilterStreamStatus::Continue;
568 };
571 self.active_id.set(context_id);
572 self.active_root_id.set(stream.parent_context_id);
573 stream.data.on_new_connection()
574 }
575
576 fn on_downstream_data(
577 &self,
578 context_id: u32,
579 data_size: usize,
580 end_of_stream: bool,
581 ) -> FilterStreamStatus {
582 let mut streams = self.streams.borrow_mut();
583 let Some(stream) = streams.get_mut(&context_id) else {
584 return FilterStreamStatus::Continue;
585 };
586 self.active_id.set(context_id);
587 self.active_root_id.set(stream.parent_context_id);
588 stream.data.on_downstream_data(&DownstreamData {
589 data_size,
590 end_of_stream,
591 attributes: Attributes::get(),
592 })
593 }
594
595 fn on_downstream_close(&self, context_id: u32, close_type: CloseType) {
596 let mut streams = self.streams.borrow_mut();
597 let Some(stream) = streams.get_mut(&context_id) else {
598 return;
599 };
600 self.active_id.set(context_id);
601 self.active_root_id.set(stream.parent_context_id);
602 stream.data.on_downstream_close(&StreamClose {
603 close_type,
604 attributes: Attributes::get(),
605 })
606 }
607
608 fn on_upstream_data(
609 &self,
610 context_id: u32,
611 data_size: usize,
612 end_of_stream: bool,
613 ) -> FilterStreamStatus {
614 let mut streams = self.streams.borrow_mut();
615 let Some(stream) = streams.get_mut(&context_id) else {
616 return FilterStreamStatus::Continue;
617 };
618 self.active_id.set(context_id);
619 self.active_root_id.set(stream.parent_context_id);
620 stream.data.on_upstream_data(&UpstreamData {
621 data_size,
622 end_of_stream,
623 attributes: Attributes::get(),
624 })
625 }
626
627 fn on_upstream_close(&self, context_id: u32, close_type: CloseType) {
628 let mut streams = self.streams.borrow_mut();
629 let Some(stream) = streams.get_mut(&context_id) else {
630 return;
631 };
632 self.active_id.set(context_id);
633 self.active_root_id.set(stream.parent_context_id);
634 stream.data.on_upstream_close(&StreamClose {
635 close_type,
636 attributes: Attributes::get(),
637 })
638 }
639
640 fn on_http_request_headers(
641 &self,
642 context_id: u32,
643 header_count: usize,
644 end_of_stream: bool,
645 ) -> FilterHeadersStatus {
646 let mut http_streams = self.http_streams.borrow_mut();
647 let context = if let Some(context) = http_streams.get_mut(&context_id) {
648 context
649 } else {
650 warn!("no http context found for on_http_request_headers (and was not implicitly created): {context_id}");
653 return FilterHeadersStatus::Continue;
654 };
657 self.active_id.set(context_id);
658 self.active_root_id.set(context.parent_context_id);
659 context.data.on_http_request_headers(&RequestHeaders {
660 header_count,
661 end_of_stream,
662 attributes: Attributes::get(),
663 })
664 }
665
666 fn on_http_request_body(
667 &self,
668 context_id: u32,
669 body_size: usize,
670 end_of_stream: bool,
671 ) -> FilterDataStatus {
672 let mut http_streams = self.http_streams.borrow_mut();
673 let Some(context) = http_streams.get_mut(&context_id) else {
674 warn!("no http context found for on_http_request_body: {context_id}");
675 return FilterDataStatus::Continue;
676 };
677 self.active_id.set(context_id);
678 self.active_root_id.set(context.parent_context_id);
679 context.data.on_http_request_body(&RequestBody {
680 body_size,
681 end_of_stream,
682 attributes: Attributes::get(),
683 })
684 }
685
686 fn on_http_request_trailers(
687 &self,
688 context_id: u32,
689 trailer_count: usize,
690 ) -> FilterTrailersStatus {
691 let mut http_streams = self.http_streams.borrow_mut();
692 let Some(context) = http_streams.get_mut(&context_id) else {
693 warn!("no http context found for on_http_request_trailers: {context_id}");
694 return FilterTrailersStatus::Continue;
695 };
696 self.active_id.set(context_id);
697 self.active_root_id.set(context.parent_context_id);
698 context.data.on_http_request_trailers(&RequestTrailers {
699 trailer_count,
700 attributes: Attributes::get(),
701 })
702 }
703
704 fn on_http_response_headers(
705 &self,
706 context_id: u32,
707 header_count: usize,
708 end_of_stream: bool,
709 ) -> FilterHeadersStatus {
710 let mut http_streams = self.http_streams.borrow_mut();
711 let Some(context) = http_streams.get_mut(&context_id) else {
712 warn!("no http context found for on_http_response_headers: {context_id}");
713 return FilterHeadersStatus::Continue;
714 };
715 self.active_id.set(context_id);
716 self.active_root_id.set(context.parent_context_id);
717 context.data.on_http_response_headers(&ResponseHeaders {
718 header_count,
719 end_of_stream,
720 attributes: Attributes::get(),
721 })
722 }
723
724 fn on_http_response_body(
725 &self,
726 context_id: u32,
727 body_size: usize,
728 end_of_stream: bool,
729 ) -> FilterDataStatus {
730 let mut http_streams = self.http_streams.borrow_mut();
731 let Some(context) = http_streams.get_mut(&context_id) else {
732 warn!("no http context found for on_http_response_body: {context_id}");
733 return FilterDataStatus::Continue;
734 };
735 self.active_id.set(context_id);
736 self.active_root_id.set(context.parent_context_id);
737 context.data.on_http_response_body(&ResponseBody {
738 body_size,
739 end_of_stream,
740 attributes: Attributes::get(),
741 })
742 }
743
744 fn on_http_response_trailers(
745 &self,
746 context_id: u32,
747 trailer_count: usize,
748 ) -> FilterTrailersStatus {
749 let mut http_streams = self.http_streams.borrow_mut();
750 let Some(context) = http_streams.get_mut(&context_id) else {
751 warn!("no http context found for on_http_response_trailers: {context_id}");
752 return FilterTrailersStatus::Continue;
753 };
754 self.active_id.set(context_id);
755 self.active_root_id.set(context.parent_context_id);
756 context.data.on_http_response_trailers(&ResponseTrailers {
757 trailer_count,
758 attributes: Attributes::get(),
759 })
760 }
761
762 fn on_http_call_response(
763 &self,
764 token_id: u32,
765 num_headers: usize,
766 body_size: usize,
767 num_trailers: usize,
768 ) {
769 let Some(callback) = self.http_callbacks.borrow_mut().remove(&token_id) else {
770 debug!(
771 "received http_call_response for token {token_id}, but no callback was registered"
772 );
773 return;
774 };
775 let mut roots = self.roots.borrow_mut();
776 let Some(root) = roots.get_mut(&callback.root_context_id) else {
777 debug!("referenced non-existing root context");
778 return;
779 };
780 let Some(_ctx) = EffectiveContext::enter(
781 callback.context_id,
782 callback.root_context_id,
783 "http callback",
784 ) else {
785 return;
786 };
787 (callback.callback)(
788 &mut root.data,
789 &HttpCallResponse::new(num_headers, body_size, num_trailers),
790 );
791 }
792
793 #[cfg(feature = "stream-metadata")]
794 fn on_grpc_receive_initial_metadata(&self, token_id: u32, num_headers: u32) {
795 let mut grpc_streams = self.grpc_streams;
796 let Some(callback) = grpc_streams.get_mut(&token_id) else {
797 debug!("received grpc message for unknown token {token_id}");
798 return;
799 };
800 let Some(function) = &mut callback.initial_meta else {
801 return;
802 };
803 let mut roots = self.roots.borrow_mut();
804 let Some(root) = roots.get_mut(&callback.root_context_id) else {
805 debug!("referenced non-existing root context");
806 return;
807 };
808
809 let Some(_ctx) =
810 EffectiveContext::enter(callback.context_id, callback.root_context_id, "grpc stream")
811 else {
812 return;
813 };
814
815 function(
816 &mut root.data,
817 GrpcStreamHandle(token_id),
818 &GrpcStreamInitialMetadata::new(num_headers as usize),
819 );
820 }
821
822 fn on_grpc_receive(&self, token_id: u32, response_size: usize) {
823 if let Some(callback) = self.grpc_callbacks.borrow_mut().remove(&token_id) {
824 let mut roots = self.roots.borrow_mut();
825 let Some(root) = roots.get_mut(&callback.root_context_id) else {
826 debug!("referenced non-existing root context");
827 return;
828 };
829 let Some(_ctx) = EffectiveContext::enter(
830 callback.context_id,
831 callback.root_context_id,
832 "grpc callback",
833 ) else {
834 return;
835 };
836
837 (callback.callback)(
838 &mut root.data,
839 &GrpcCallResponse::new(token_id, GrpcCode::Ok, None, response_size),
840 );
841 } else if let Some(callback) = self.grpc_streams.borrow_mut().get_mut(&token_id) {
842 let Some(function) = &mut callback.message else {
843 return;
844 };
845 let mut roots = self.roots.borrow_mut();
846 let Some(root) = roots.get_mut(&callback.root_context_id) else {
847 debug!("referenced non-existing root context");
848 return;
849 };
850
851 let Some(_ctx) = EffectiveContext::enter(
852 callback.context_id,
853 callback.root_context_id,
854 "grpc stream",
855 ) else {
856 return;
857 };
858
859 function(
860 &mut root.data,
861 GrpcStreamHandle(token_id),
862 &GrpcStreamMessage::new(GrpcCode::Ok, None, response_size),
863 );
864 } else {
865 debug!("received grpc message for unknown token {token_id}");
866 }
867 }
868
869 #[cfg(feature = "stream-metadata")]
870 fn on_grpc_receive_trailing_metadata(&self, token_id: u32, num_headers: u32) {
871 let mut grpc_streams = self.grpc_streams.borrow_mut();
872 let Some(callback) = grpc_streams.get_mut(&token_id) else {
873 debug!("received grpc message for unknown token {token_id}");
874 return;
875 };
876 let Some(function) = &mut callback.trailer_meta else {
877 return;
878 };
879 let mut roots = self.roots.borrow_mut();
880 let Some(root) = roots.get_mut(&callback.root_context_id) else {
881 debug!("referenced non-existing root context");
882 return;
883 };
884 let Some(_ctx) =
885 EffectiveContext::enter(callback.context_id, callback.root_context_id, "grpc stream")
886 else {
887 return;
888 };
889
890 function(
891 &mut root.data,
892 GrpcStreamHandle(token_id),
893 &GrpcStreamTrailingMetadata::new(num_headers as usize),
894 );
895 }
896
897 fn on_grpc_close(&self, token_id: u32, status_code: u32) {
898 if let Some(callback) = self.grpc_callbacks.borrow_mut().remove(&token_id) {
899 let mut roots = self.roots.borrow_mut();
900 let Some(root) = roots.get_mut(&callback.root_context_id) else {
901 debug!("referenced non-existing root context");
902 return;
903 };
904 let Some(_ctx) = EffectiveContext::enter(
905 callback.context_id,
906 callback.root_context_id,
907 "grpc callback",
908 ) else {
909 return;
910 };
911 let Some((status, message)) =
912 check_concern("grpc-call-close-status", hostcalls::get_grpc_status())
913 else {
914 return;
915 };
916 if status != status_code {
917 warn!("status code mismatch for on_grpc_close");
918 }
919
920 (callback.callback)(
921 &mut root.data,
922 &GrpcCallResponse::new(token_id, status.into(), message, 0),
923 );
924 } else if let Some(callback) = self.grpc_streams.borrow_mut().remove(&token_id) {
925 let Some(function) = callback.close else {
926 return;
927 };
928 let mut roots = self.roots.borrow_mut();
929 let Some(root) = roots.get_mut(&callback.root_context_id) else {
930 debug!("referenced non-existing root context");
931 return;
932 };
933 let Some(_ctx) = EffectiveContext::enter(
934 callback.context_id,
935 callback.root_context_id,
936 "grpc stream",
937 ) else {
938 return;
939 };
940 let Some((status, message)) =
941 check_concern("grpc-stream-close-status", hostcalls::get_grpc_status())
942 else {
943 return;
944 };
945
946 function(
947 &mut root.data,
948 &GrpcStreamClose::new(token_id, status.into(), message),
949 );
950 } else {
951 debug!("received grpc close for unknown token {token_id}");
952 }
953 }
954}
955
956#[no_mangle]
957pub extern "C" fn proxy_on_context_create(context_id: usize, root_context_id: usize) {
958 dispatch(|d| d.on_create_context(context_id as u32, root_context_id as u32))
959}
960
961#[no_mangle]
962pub extern "C" fn proxy_on_done(context_id: usize) -> usize {
963 dispatch(|d| d.on_done(context_id as u32)) as usize
964}
965
966#[no_mangle]
967pub extern "C" fn proxy_on_log(context_id: usize) {
968 dispatch(|d| d.on_log(context_id as u32))
969}
970
971#[no_mangle]
972pub extern "C" fn proxy_on_delete(context_id: usize) {
973 dispatch(|d| d.on_delete(context_id as u32))
974}
975
976#[no_mangle]
977pub extern "C" fn proxy_on_vm_start(context_id: usize, vm_configuration_size: usize) -> usize {
978 dispatch(|d| d.on_vm_start(context_id as u32, vm_configuration_size)) as usize
979}
980
981#[no_mangle]
982pub extern "C" fn proxy_on_configure(context_id: usize, plugin_configuration_size: usize) -> usize {
983 dispatch(|d| d.on_configure(context_id as u32, plugin_configuration_size)) as usize
984}
985
986#[no_mangle]
987pub extern "C" fn proxy_on_tick(context_id: usize) {
988 dispatch(|d| d.on_tick(context_id as u32))
989}
990
991#[no_mangle]
992pub extern "C" fn proxy_on_queue_ready(context_id: usize, queue_id: usize) {
993 dispatch(|d| d.on_queue_ready(context_id as u32, queue_id as u32))
994}
995
996#[no_mangle]
997pub extern "C" fn proxy_on_new_connection(context_id: usize) -> FilterStreamStatus {
998 dispatch(|d| d.on_new_connection(context_id as u32))
999}
1000
1001#[no_mangle]
1002pub extern "C" fn proxy_on_downstream_data(
1003 context_id: usize,
1004 data_size: usize,
1005 end_of_stream: usize,
1006) -> FilterStreamStatus {
1007 dispatch(|d| d.on_downstream_data(context_id as u32, data_size, end_of_stream != 0))
1008}
1009
1010#[no_mangle]
1011pub extern "C" fn proxy_on_downstream_connection_close(context_id: usize, close_type: CloseType) {
1012 dispatch(|d| d.on_downstream_close(context_id as u32, close_type))
1013}
1014
1015#[no_mangle]
1016pub extern "C" fn proxy_on_upstream_data(
1017 context_id: usize,
1018 data_size: usize,
1019 end_of_stream: usize,
1020) -> FilterStreamStatus {
1021 dispatch(|d| d.on_upstream_data(context_id as u32, data_size, end_of_stream != 0))
1022}
1023
1024#[no_mangle]
1025pub extern "C" fn proxy_on_upstream_connection_close(context_id: usize, close_type: CloseType) {
1026 dispatch(|d| d.on_upstream_close(context_id as u32, close_type))
1027}
1028
1029#[no_mangle]
1030pub extern "C" fn proxy_on_request_headers(
1031 context_id: usize,
1032 num_headers: usize,
1033 end_of_stream: usize,
1034) -> FilterHeadersStatus {
1035 dispatch(|d| d.on_http_request_headers(context_id as u32, num_headers, end_of_stream != 0))
1036}
1037
1038#[no_mangle]
1039pub extern "C" fn proxy_on_request_body(
1040 context_id: usize,
1041 body_size: usize,
1042 end_of_stream: usize,
1043) -> FilterDataStatus {
1044 dispatch(|d| d.on_http_request_body(context_id as u32, body_size, end_of_stream != 0))
1045}
1046
1047#[no_mangle]
1048pub extern "C" fn proxy_on_request_trailers(
1049 context_id: usize,
1050 num_trailers: usize,
1051) -> FilterTrailersStatus {
1052 dispatch(|d| d.on_http_request_trailers(context_id as u32, num_trailers))
1053}
1054
1055#[no_mangle]
1056pub extern "C" fn proxy_on_response_headers(
1057 context_id: usize,
1058 num_headers: usize,
1059 end_of_stream: usize,
1060) -> FilterHeadersStatus {
1061 dispatch(|d| d.on_http_response_headers(context_id as u32, num_headers, end_of_stream != 0))
1062}
1063
1064#[no_mangle]
1065pub extern "C" fn proxy_on_response_body(
1066 context_id: usize,
1067 body_size: usize,
1068 end_of_stream: usize,
1069) -> FilterDataStatus {
1070 dispatch(|d| d.on_http_response_body(context_id as u32, body_size, end_of_stream != 0))
1071}
1072
1073#[no_mangle]
1074pub extern "C" fn proxy_on_response_trailers(
1075 context_id: usize,
1076 num_trailers: usize,
1077) -> FilterTrailersStatus {
1078 dispatch(|d| d.on_http_response_trailers(context_id as u32, num_trailers))
1079}
1080
1081#[no_mangle]
1082pub extern "C" fn proxy_on_http_call_response(
1083 _context_id: usize,
1084 token_id: usize,
1085 num_headers: usize,
1086 body_size: usize,
1087 num_trailers: usize,
1088) {
1089 dispatch(|d| d.on_http_call_response(token_id as u32, num_headers, body_size, num_trailers))
1090}
1091
1092#[cfg(feature = "stream-metadata")]
1093#[no_mangle]
1094pub extern "C" fn proxy_on_grpc_receive_initial_metadata(
1095 _context_id: usize,
1096 token_id: usize,
1097 headers: usize,
1098) {
1099 DISPATCHER
1100 .with_borrow_mut(|d| d.on_grpc_receive_initial_metadata(token_id as u32, headers as u32))
1101}
1102
1103#[no_mangle]
1104pub extern "C" fn proxy_on_grpc_receive(_context_id: usize, token_id: usize, response_size: usize) {
1105 dispatch(|d| d.on_grpc_receive(token_id as u32, response_size))
1106}
1107
1108#[cfg(feature = "stream-metadata")]
1109#[no_mangle]
1110pub extern "C" fn proxy_on_grpc_receive_trailing_metadata(
1111 _context_id: usize,
1112 token_id: usize,
1113 trailers: usize,
1114) {
1115 dispatch(|d| d.on_grpc_receive_trailing_metadata(token_id as usize, trailers as usize))
1116}
1117
1118#[no_mangle]
1119pub extern "C" fn proxy_on_grpc_close(_context_id: usize, token_id: usize, status_code: usize) {
1120 dispatch(|d| d.on_grpc_close(token_id as u32, status_code as u32))
1121}